You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by jg...@apache.org on 2010/05/27 01:31:57 UTC

svn commit: r948634 [2/2] - in /hadoop/hdfs/trunk: ./ ivy/ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/ src/java/org/apache/hadoop/hdfs/security/ src/java/org/apache/hadoop/hdfs/security/token/block/ src/java/org/apache/ha...

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=948634&r1=948633&r2=948634&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Wed May 26 23:31:56 2010
@@ -39,8 +39,8 @@ import org.apache.hadoop.hdfs.protocol.D
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
-import org.apache.hadoop.hdfs.security.BlockAccessToken;
-import org.apache.hadoop.hdfs.security.AccessTokenHandler;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.MetaDataInputStream;
 import org.apache.hadoop.io.IOUtils;
@@ -49,6 +49,8 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.metrics.util.MetricsTimeVaryingInt;
 import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.StringUtils;
 
@@ -126,27 +128,33 @@ class DataXceiver extends DataTransferPr
    * Read a block from the disk.
    */
   @Override
-  protected void opReadBlock(DataInputStream in,
-      long blockId, long blockGs, long startOffset, long length,
-      String clientName, BlockAccessToken accessToken) throws IOException {
+  protected void opReadBlock(DataInputStream in, long blockId, long blockGs,
+      long startOffset, long length, String clientName,
+      Token<BlockTokenIdentifier> blockToken) throws IOException {
     final Block block = new Block(blockId, 0 , blockGs);
     OutputStream baseStream = NetUtils.getOutputStream(s, 
         datanode.socketWriteTimeout);
     DataOutputStream out = new DataOutputStream(
                  new BufferedOutputStream(baseStream, SMALL_BUFFER_SIZE));
-    
-    if (datanode.isAccessTokenEnabled
-        && !datanode.accessTokenHandler.checkAccess(accessToken, null, blockId,
-            AccessTokenHandler.AccessMode.READ)) {
+
+    if (datanode.isBlockTokenEnabled) {
       try {
-        ERROR_ACCESS_TOKEN.write(out);
-        out.flush();
-        throw new IOException("Access token verification failed, for client "
-            + remoteAddress + " for OP_READ_BLOCK for block " + block);
-      } finally {
-        IOUtils.closeStream(out);
+        datanode.blockTokenSecretManager.checkAccess(blockToken, null, block,
+            BlockTokenSecretManager.AccessMode.READ);
+      } catch (InvalidToken e) {
+        try {
+          ERROR_ACCESS_TOKEN.write(out);
+          out.flush();
+          LOG.warn("Block token verification failed, for client "
+              + remoteAddress + " for OP_READ_BLOCK for block " + block + " : "
+              + e.getLocalizedMessage());
+          throw e;
+        } finally {
+          IOUtils.closeStream(out);
+        }
       }
     }
+  
     // send the block
     BlockSender blockSender = null;
     final String clientTraceFmt =
@@ -212,7 +220,7 @@ class DataXceiver extends DataTransferPr
       int pipelineSize, BlockConstructionStage stage,
       long newGs, long minBytesRcvd, long maxBytesRcvd,
       String client, DatanodeInfo srcDataNode, DatanodeInfo[] targets,
-      BlockAccessToken accessToken) throws IOException {
+      Token<BlockTokenIdentifier> blockToken) throws IOException {
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("writeBlock receive buf size " + s.getReceiveBufferSize() +
@@ -228,19 +236,24 @@ class DataXceiver extends DataTransferPr
     DataOutputStream replyOut = null;   // stream to prev target
     replyOut = new DataOutputStream(
                    NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
-    if (datanode.isAccessTokenEnabled
-        && !datanode.accessTokenHandler.checkAccess(accessToken, null, block
-            .getBlockId(), AccessTokenHandler.AccessMode.WRITE)) {
+    if (datanode.isBlockTokenEnabled) {
       try {
-        if (client.length() != 0) {
-          ERROR_ACCESS_TOKEN.write(replyOut);
-          Text.writeString(replyOut, datanode.dnRegistration.getName());
-          replyOut.flush();
+        datanode.blockTokenSecretManager.checkAccess(blockToken, null, block,
+            BlockTokenSecretManager.AccessMode.WRITE);
+      } catch (InvalidToken e) {
+        try {
+          if (client.length() != 0) {
+            ERROR_ACCESS_TOKEN.write(replyOut);
+            Text.writeString(replyOut, datanode.dnRegistration.getName());
+            replyOut.flush();
+          }
+          LOG.warn("Block token verification failed, for client "
+              + remoteAddress + " for OP_WRITE_BLOCK for block " + block
+              + " : " + e.getLocalizedMessage());
+          throw e;
+        } finally {
+          IOUtils.closeStream(replyOut);
         }
-        throw new IOException("Access token verification failed, for client "
-            + remoteAddress + " for OP_WRITE_BLOCK for block " + block);
-      } finally {
-        IOUtils.closeStream(replyOut);
       }
     }
 
@@ -292,7 +305,7 @@ class DataXceiver extends DataTransferPr
           DataTransferProtocol.Sender.opWriteBlock(mirrorOut,
               blockId, blockGs, 
               pipelineSize, stage, newGs, minBytesRcvd, maxBytesRcvd, client, 
-              srcDataNode, targets, accessToken);
+              srcDataNode, targets, blockToken);
 
           if (blockReceiver != null) { // send checksum header
             blockReceiver.writeChecksumHeader(mirrorOut);
@@ -395,22 +408,27 @@ class DataXceiver extends DataTransferPr
    * Get block checksum (MD5 of CRC32).
    */
   @Override
-  protected void opBlockChecksum(DataInputStream in,
-      long blockId, long blockGs, BlockAccessToken accessToken) throws IOException {
+  protected void opBlockChecksum(DataInputStream in, long blockId,
+      long blockGs, Token<BlockTokenIdentifier> blockToken) throws IOException {
     final Block block = new Block(blockId, 0 , blockGs);
     DataOutputStream out = new DataOutputStream(NetUtils.getOutputStream(s,
         datanode.socketWriteTimeout));
-    if (datanode.isAccessTokenEnabled
-        && !datanode.accessTokenHandler.checkAccess(accessToken, null, block
-            .getBlockId(), AccessTokenHandler.AccessMode.READ)) {
+    if (datanode.isBlockTokenEnabled) {
       try {
-        ERROR_ACCESS_TOKEN.write(out);
-        out.flush();
-        throw new IOException(
-            "Access token verification failed, for client " + remoteAddress
-                + " for OP_BLOCK_CHECKSUM for block " + block);
-      } finally {
-        IOUtils.closeStream(out);
+        datanode.blockTokenSecretManager.checkAccess(blockToken, null, block,
+            BlockTokenSecretManager.AccessMode.READ);
+      } catch (InvalidToken e) {
+        try {
+          ERROR_ACCESS_TOKEN.write(out);
+          out.flush();
+          LOG.warn("Block token verification failed, for client "
+              + remoteAddress + " for OP_BLOCK_CHECKSUM for block " + block
+              + " : " + e.getLocalizedMessage());
+          throw e;
+        } finally {
+          IOUtils.closeStream(out);
+        }
+
       }
     }
 
@@ -454,17 +472,22 @@ class DataXceiver extends DataTransferPr
    * Read a block from the disk and then sends it to a destination.
    */
   @Override
-  protected void opCopyBlock(DataInputStream in,
-      long blockId, long blockGs, BlockAccessToken accessToken) throws IOException {
+  protected void opCopyBlock(DataInputStream in, long blockId, long blockGs,
+      Token<BlockTokenIdentifier> blockToken) throws IOException {
     // Read in the header
     Block block = new Block(blockId, 0, blockGs);
-    if (datanode.isAccessTokenEnabled
-        && !datanode.accessTokenHandler.checkAccess(accessToken, null, blockId,
-            AccessTokenHandler.AccessMode.COPY)) {
-      LOG.warn("Invalid access token in request from "
-          + remoteAddress + " for OP_COPY_BLOCK for block " + block);
-      sendResponse(s, ERROR_ACCESS_TOKEN, datanode.socketWriteTimeout);
-      return;
+    if (datanode.isBlockTokenEnabled) {
+      try {
+        datanode.blockTokenSecretManager.checkAccess(blockToken, null, block,
+            BlockTokenSecretManager.AccessMode.COPY);
+      } catch (InvalidToken e) {
+        LOG.warn("Invalid access token in request from " + remoteAddress
+            + " for OP_COPY_BLOCK for block " + block + " : "
+            + e.getLocalizedMessage());
+        sendResponse(s, ERROR_ACCESS_TOKEN, datanode.socketWriteTimeout);
+        return;
+      }
+
     }
 
     if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start
@@ -526,17 +549,21 @@ class DataXceiver extends DataTransferPr
   @Override
   protected void opReplaceBlock(DataInputStream in,
       long blockId, long blockGs, String sourceID, DatanodeInfo proxySource,
-      BlockAccessToken accessToken) throws IOException {
+      Token<BlockTokenIdentifier> blockToken) throws IOException {
     /* read header */
     final Block block = new Block(blockId, dataXceiverServer.estimateBlockSize,
         blockGs);
-    if (datanode.isAccessTokenEnabled
-        && !datanode.accessTokenHandler.checkAccess(accessToken, null, blockId,
-            AccessTokenHandler.AccessMode.REPLACE)) {
-      LOG.warn("Invalid access token in request from "
-          + remoteAddress + " for OP_REPLACE_BLOCK for block " + block);
-      sendResponse(s, ERROR_ACCESS_TOKEN, datanode.socketWriteTimeout);
-      return;
+    if (datanode.isBlockTokenEnabled) {
+      try {
+        datanode.blockTokenSecretManager.checkAccess(blockToken, null, block,
+            BlockTokenSecretManager.AccessMode.REPLACE);
+      } catch (InvalidToken e) {
+        LOG.warn("Invalid access token in request from " + remoteAddress
+            + " for OP_REPLACE_BLOCK for block " + block + " : "
+            + e.getLocalizedMessage());
+        sendResponse(s, ERROR_ACCESS_TOKEN, datanode.socketWriteTimeout);
+        return;
+      }
     }
 
     if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start
@@ -567,7 +594,7 @@ class DataXceiver extends DataTransferPr
 
       /* send request to the proxy */
       DataTransferProtocol.Sender.opCopyBlock(proxyOut, block.getBlockId(),
-          block.getGenerationStamp(), accessToken);
+          block.getGenerationStamp(), blockToken);
 
       // receive the response from the proxy
       proxyReply = new DataInputStream(new BufferedInputStream(

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java?rev=948634&r1=948633&r2=948634&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java Wed May 26 23:31:56 2010
@@ -39,10 +39,12 @@ import org.apache.hadoop.hdfs.protocol.D
 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.security.BlockAccessToken;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
 import org.apache.hadoop.hdfs.server.common.JspHelper;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.StringUtils;
 
 @InterfaceAudience.Private
@@ -379,7 +381,7 @@ public class DatanodeJspHelper {
 
     final DFSClient dfs = getDFSClient(ugi, datanode.getNameNodeAddr(), conf);
 
-    BlockAccessToken accessToken = BlockAccessToken.DUMMY_TOKEN;
+    Token<BlockTokenIdentifier> blockToken = BlockTokenSecretManager.DUMMY_TOKEN;
     if (conf.getBoolean(
         DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, 
         DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_DEFAULT)) {
@@ -392,7 +394,7 @@ public class DatanodeJspHelper {
       }
       for (int i = 0; i < blks.size(); i++) {
         if (blks.get(i).getBlock().getBlockId() == blockId) {
-          accessToken = blks.get(i).getAccessToken();
+          blockToken = blks.get(i).getBlockToken();
           break;
         }
       }
@@ -558,7 +560,7 @@ public class DatanodeJspHelper {
     out.print("<textarea cols=\"100\" rows=\"25\" wrap=\"virtual\" style=\"width:100%\" READONLY>");
     try {
       JspHelper.streamBlockInAscii(new InetSocketAddress(req.getServerName(),
-          datanodePort), blockId, accessToken, genStamp, blockSize,
+          datanodePort), blockId, blockToken, genStamp, blockSize,
           startOffset, chunkSizeToView, out, conf);
     } catch (Exception e) {
       out.print(e);
@@ -627,7 +629,7 @@ public class DatanodeJspHelper {
     LocatedBlock lastBlk = blocks.get(blocks.size() - 1);
     long blockSize = lastBlk.getBlock().getNumBytes();
     long blockId = lastBlk.getBlock().getBlockId();
-    BlockAccessToken accessToken = lastBlk.getAccessToken();
+    Token<BlockTokenIdentifier> accessToken = lastBlk.getBlockToken();
     long genStamp = lastBlk.getBlock().getGenerationStamp();
     DatanodeInfo chosenNode;
     try {

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=948634&r1=948633&r2=948634&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Wed May 26 23:31:56 2010
@@ -23,8 +23,8 @@ import org.apache.hadoop.HadoopIllegalAr
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.*;
-import org.apache.hadoop.hdfs.security.AccessTokenHandler;
-import org.apache.hadoop.hdfs.security.ExportedAccessKeys;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
+import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
@@ -164,10 +164,10 @@ public class FSNamesystem implements FSC
   private FSNamesystemMetrics myFSMetrics;
   private long capacityTotal = 0L, capacityUsed = 0L, capacityRemaining = 0L;
   private int totalLoad = 0;
-  boolean isAccessTokenEnabled;
-  AccessTokenHandler accessTokenHandler;
-  private long accessKeyUpdateInterval;
-  private long accessTokenLifetime;
+  boolean isBlockTokenEnabled;
+  BlockTokenSecretManager blockTokenSecretManager;
+  private long blockKeyUpdateInterval;
+  private long blockTokenLifetime;
   
   // Scan interval is not configurable.
   private final long DELEGATION_TOKEN_REMOVER_SCAN_INTERVAL = 3600000; // 1 hour
@@ -301,9 +301,9 @@ public class FSNamesystem implements FSC
     this.safeMode = new SafeModeInfo(conf);
     this.hostsReader = new HostsFileReader(conf.get("dfs.hosts",""),
                         conf.get("dfs.hosts.exclude",""));
-    if (isAccessTokenEnabled) {
-      accessTokenHandler = new AccessTokenHandler(true,
-          accessKeyUpdateInterval, accessTokenLifetime);
+    if (isBlockTokenEnabled) {
+      blockTokenSecretManager = new BlockTokenSecretManager(true,
+          blockKeyUpdateInterval, blockTokenLifetime);
     }
   }
 
@@ -455,20 +455,20 @@ public class FSNamesystem implements FSC
     this.accessTimePrecision = conf.getLong(DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_KEY, 0);
     this.supportAppends = conf.getBoolean(DFSConfigKeys.DFS_SUPPORT_APPEND_KEY,
                                       DFSConfigKeys.DFS_SUPPORT_APPEND_DEFAULT);
-    this.isAccessTokenEnabled = conf.getBoolean(
+    this.isBlockTokenEnabled = conf.getBoolean(
         DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, 
         DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_DEFAULT);
-    if (isAccessTokenEnabled) {
-      this.accessKeyUpdateInterval = conf.getLong(
+    if (isBlockTokenEnabled) {
+      this.blockKeyUpdateInterval = conf.getLong(
           DFSConfigKeys.DFS_BLOCK_ACCESS_KEY_UPDATE_INTERVAL_KEY, 
           DFSConfigKeys.DFS_BLOCK_ACCESS_KEY_UPDATE_INTERVAL_DEFAULT) * 60 * 1000L; // 10 hrs
-      this.accessTokenLifetime = conf.getLong(
+      this.blockTokenLifetime = conf.getLong(
           DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_LIFETIME_KEY, 
           DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_LIFETIME_DEFAULT) * 60 * 1000L; // 10 hrs
     }
-    LOG.info("isAccessTokenEnabled=" + isAccessTokenEnabled
-        + " accessKeyUpdateInterval=" + accessKeyUpdateInterval / (60 * 1000)
-        + " min(s), accessTokenLifetime=" + accessTokenLifetime / (60 * 1000)
+    LOG.info("isBlockTokenEnabled=" + isBlockTokenEnabled
+        + " blockKeyUpdateInterval=" + blockKeyUpdateInterval / (60 * 1000)
+        + " min(s), blockTokenLifetime=" + blockTokenLifetime / (60 * 1000)
         + " min(s)");
   }
 
@@ -632,9 +632,9 @@ public class FSNamesystem implements FSC
    * 
    * @return current access keys
    */
-  ExportedAccessKeys getAccessKeys() {
-    return isAccessTokenEnabled ? accessTokenHandler.exportKeys()
-        : ExportedAccessKeys.DUMMY_KEYS;
+  ExportedBlockKeys getBlockKeys() {
+    return isBlockTokenEnabled ? blockTokenSecretManager.exportKeys()
+        : ExportedBlockKeys.DUMMY_KEYS;
   }
 
   /**
@@ -802,9 +802,9 @@ public class FSNamesystem implements FSC
   LocatedBlock createLocatedBlock(final Block b, final DatanodeInfo[] locations,
       final long offset, final boolean corrupt) throws IOException {
     final LocatedBlock lb = new LocatedBlock(b, locations, offset, corrupt);
-    if (isAccessTokenEnabled) {
-      lb.setAccessToken(accessTokenHandler.generateToken(b.getBlockId(),
-          EnumSet.of(AccessTokenHandler.AccessMode.READ)));
+    if (isBlockTokenEnabled) {
+      lb.setBlockToken(blockTokenSecretManager.generateToken(b,
+          EnumSet.of(BlockTokenSecretManager.AccessMode.READ)));
     }
     return lb;
   }
@@ -1364,9 +1364,9 @@ public class FSNamesystem implements FSC
 
           lb = new LocatedBlock(lastBlock, targets, 
                                 fileLength-lastBlock.getNumBytes());
-          if (isAccessTokenEnabled) {
-            lb.setAccessToken(accessTokenHandler.generateToken(lb.getBlock()
-                .getBlockId(), EnumSet.of(AccessTokenHandler.AccessMode.WRITE)));
+          if (isBlockTokenEnabled) {
+            lb.setBlockToken(blockTokenSecretManager.generateToken(lb.getBlock(), 
+                EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE)));
           }
 
           // Remove block from replication queue.
@@ -1482,9 +1482,9 @@ public class FSNamesystem implements FSC
         
     // Create next block
     LocatedBlock b = new LocatedBlock(newBlock, targets, fileLength);
-    if (isAccessTokenEnabled) {
-      b.setAccessToken(accessTokenHandler.generateToken(b.getBlock()
-          .getBlockId(), EnumSet.of(AccessTokenHandler.AccessMode.WRITE)));
+    if (isBlockTokenEnabled) {
+      b.setBlockToken(blockTokenSecretManager.generateToken(b.getBlock(), 
+          EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE)));
     }
     return b;
   }
@@ -2310,7 +2310,7 @@ public class FSNamesystem implements FSC
                                       nodeReg.getInfoPort(),
                                       nodeReg.getIpcPort());
     nodeReg.updateRegInfo(dnReg);
-    nodeReg.exportedKeys = getAccessKeys();
+    nodeReg.exportedKeys = getBlockKeys();
       
     NameNode.stateChangeLog.info(
                                  "BLOCK* NameSystem.registerDatanode: "
@@ -2523,8 +2523,8 @@ public class FSNamesystem implements FSC
           cmds.add(cmd);
         }
         // check access key update
-        if (isAccessTokenEnabled && nodeinfo.needKeyUpdate) {
-          cmds.add(new KeyUpdateCommand(accessTokenHandler.exportKeys()));
+        if (isBlockTokenEnabled && nodeinfo.needKeyUpdate) {
+          cmds.add(new KeyUpdateCommand(blockTokenSecretManager.exportKeys()));
           nodeinfo.needKeyUpdate = false;
         }
         if (!cmds.isEmpty()) {
@@ -2562,8 +2562,8 @@ public class FSNamesystem implements FSC
   /**
    * Update access keys.
    */
-  void updateAccessKey() throws IOException {
-    this.accessTokenHandler.updateKeys();
+  void updateBlockKey() throws IOException {
+    this.blockTokenSecretManager.updateKeys();
     synchronized (heartbeats) {
       for (DatanodeDescriptor nodeInfo : heartbeats) {
         nodeInfo.needKeyUpdate = true;
@@ -2572,11 +2572,11 @@ public class FSNamesystem implements FSC
   }
 
   /**
-   * Periodically calls heartbeatCheck() and updateAccessKey()
+   * Periodically calls heartbeatCheck() and updateBlockKey()
    */
   class HeartbeatMonitor implements Runnable {
     private long lastHeartbeatCheck;
-    private long lastAccessKeyUpdate;
+    private long lastBlockKeyUpdate;
     /**
      */
     public void run() {
@@ -2587,9 +2587,9 @@ public class FSNamesystem implements FSC
             heartbeatCheck();
             lastHeartbeatCheck = now;
           }
-          if (isAccessTokenEnabled && (lastAccessKeyUpdate + accessKeyUpdateInterval < now)) {
-            updateAccessKey();
-            lastAccessKeyUpdate = now;
+          if (isBlockTokenEnabled && (lastBlockKeyUpdate + blockKeyUpdateInterval < now)) {
+            updateBlockKey();
+            lastBlockKeyUpdate = now;
           }
         } catch (Exception e) {
           FSNamesystem.LOG.error(StringUtils.stringifyException(e));
@@ -4229,9 +4229,9 @@ public class FSNamesystem implements FSC
     // get a new generation stamp and an access token
     block.setGenerationStamp(nextGenerationStamp());
     LocatedBlock locatedBlock = new LocatedBlock(block, new DatanodeInfo[0]);
-    if (isAccessTokenEnabled) {
-      locatedBlock.setAccessToken(accessTokenHandler.generateToken(
-          block.getBlockId(), EnumSet.of(AccessTokenHandler.AccessMode.WRITE)));
+    if (isBlockTokenEnabled) {
+      locatedBlock.setBlockToken(blockTokenSecretManager.generateToken(
+          block, EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE)));
     }
     return locatedBlock;
   }

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=948634&r1=948633&r2=948634&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java Wed May 26 23:31:56 2010
@@ -54,7 +54,7 @@ import org.apache.hadoop.hdfs.protocol.L
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
 import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
-import org.apache.hadoop.hdfs.security.ExportedAccessKeys;
+import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
 import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
 import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole;
@@ -498,8 +498,8 @@ public class NameNode implements Namenod
   }
 
   /** {@inheritDoc} */
-  public ExportedAccessKeys getAccessKeys() throws IOException {
-    return namesystem.getAccessKeys();
+  public ExportedBlockKeys getBlockKeys() throws IOException {
+    return namesystem.getBlockKeys();
   }
 
   @Override // NamenodeProtocol

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java?rev=948634&r1=948633&r2=948634&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java Wed May 26 23:31:56 2010
@@ -501,7 +501,7 @@ public class NamenodeFsck {
           BlockReader.newBlockReader(s, targetAddr.toString() + ":" + 
                                                block.getBlockId(), 
                                                block.getBlockId(), 
-                                               lblock.getAccessToken(),
+                                               lblock.getBlockToken(),
                                                block.getGenerationStamp(), 
                                                0, -1,
                                                conf.getInt("io.file.buffer.size", 4096));

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java?rev=948634&r1=948633&r2=948634&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java Wed May 26 23:31:56 2010
@@ -23,7 +23,7 @@ import java.io.DataOutput;
 import java.io.IOException;
 
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
-import org.apache.hadoop.hdfs.security.ExportedAccessKeys;
+import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.hdfs.server.datanode.DataStorage;
@@ -47,7 +47,7 @@ implements Writable, NodeRegistration {
   }
 
   public StorageInfo storageInfo;
-  public ExportedAccessKeys exportedKeys;
+  public ExportedBlockKeys exportedKeys;
 
   /**
    * Default constructor.
@@ -62,7 +62,7 @@ implements Writable, NodeRegistration {
   public DatanodeRegistration(String nodeName) {
     super(nodeName);
     this.storageInfo = new StorageInfo();
-    this.exportedKeys = new ExportedAccessKeys();
+    this.exportedKeys = new ExportedBlockKeys();
   }
   
   public void setInfoPort(int infoPort) {

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/protocol/KeyUpdateCommand.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/protocol/KeyUpdateCommand.java?rev=948634&r1=948633&r2=948634&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/protocol/KeyUpdateCommand.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/protocol/KeyUpdateCommand.java Wed May 26 23:31:56 2010
@@ -21,24 +21,24 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
-import org.apache.hadoop.hdfs.security.ExportedAccessKeys;
+import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableFactories;
 import org.apache.hadoop.io.WritableFactory;
 
 public class KeyUpdateCommand extends DatanodeCommand {
-  private ExportedAccessKeys keys;
+  private ExportedBlockKeys keys;
 
   KeyUpdateCommand() {
-    this(new ExportedAccessKeys());
+    this(new ExportedBlockKeys());
   }
 
-  public KeyUpdateCommand(ExportedAccessKeys keys) {
+  public KeyUpdateCommand(ExportedBlockKeys keys) {
     super(DatanodeProtocol.DNA_ACCESSKEYUPDATE);
     this.keys = keys;
   }
 
-  public ExportedAccessKeys getExportedKeys() {
+  public ExportedBlockKeys getExportedKeys() {
     return this.keys;
   }
 

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java?rev=948634&r1=948633&r2=948634&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java Wed May 26 23:31:56 2010
@@ -22,7 +22,7 @@ import java.io.IOException;
 
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.security.ExportedAccessKeys;
+import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
 import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
 import org.apache.hadoop.ipc.VersionedProtocol;
 import org.apache.hadoop.security.KerberosInfo;
@@ -74,12 +74,12 @@ public interface NamenodeProtocol extend
   throws IOException;
 
   /**
-   * Get the current access keys
+   * Get the current block keys
    * 
-   * @return ExportedAccessKeys containing current access keys
+   * @return ExportedBlockKeys containing current block keys
    * @throws IOException 
    */
-  public ExportedAccessKeys getAccessKeys() throws IOException;
+  public ExportedBlockKeys getBlockKeys() throws IOException;
 
   /**
    * Get the size of the current edit log (in bytes).

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java?rev=948634&r1=948633&r2=948634&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java Wed May 26 23:31:56 2010
@@ -42,9 +42,10 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSClient.DFSDataInputStream;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.security.BlockAccessToken;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.security.ShellBasedUnixGroupsMapping;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.UserGroupInformation;
 
 /** Utilities for HDFS tests */
@@ -262,8 +263,9 @@ public class DFSTestUtil {
     return ((DFSClient.DFSDataInputStream) in).getAllBlocks();
   }
 
-  public static BlockAccessToken getAccessToken(FSDataOutputStream out) {
-    return ((DFSOutputStream) out.getWrappedStream()).getAccessToken();
+  public static Token<BlockTokenIdentifier> getBlockToken(
+      FSDataOutputStream out) {
+    return ((DFSOutputStream) out.getWrappedStream()).getBlockToken();
   }
 
   static void setLogLevel2All(org.apache.commons.logging.Log log) {

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java?rev=948634&r1=948633&r2=948634&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java Wed May 26 23:31:56 2010
@@ -92,7 +92,7 @@ public class TestClientBlockVerification
 
     return BlockReader.newBlockReader(
       s, targetAddr.toString()+ ":" + block.getBlockId(), block.getBlockId(),
-      testBlock.getAccessToken(), block.getGenerationStamp(),
+      testBlock.getBlockToken(), block.getGenerationStamp(),
       offset, lenToRead,
       conf.getInt("io.file.buffer.size", 4096));
   }

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java?rev=948634&r1=948633&r2=948634&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java Wed May 26 23:31:56 2010
@@ -49,7 +49,7 @@ import org.apache.hadoop.hdfs.protocol.D
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
 import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
-import org.apache.hadoop.hdfs.security.BlockAccessToken;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.io.IOUtils;
@@ -170,7 +170,7 @@ public class TestDataTransferProtocol ex
     DataTransferProtocol.Sender.opWriteBlock(sendOut, 
         block.getBlockId(), block.getGenerationStamp(), 0,
         stage, newGS, block.getNumBytes(), block.getNumBytes(), "cl", null,
-        new DatanodeInfo[1], BlockAccessToken.DUMMY_TOKEN);
+        new DatanodeInfo[1], BlockTokenSecretManager.DUMMY_TOKEN);
     if (eofExcepted) {
       ERROR.write(recvOut);
       sendRecvData(description, true);
@@ -356,7 +356,7 @@ public class TestDataTransferProtocol ex
     DataTransferProtocol.Sender.opWriteBlock(sendOut, 
         newBlockId, 0L, 0,
         BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, 0L, 0L, "cl", null,
-        new DatanodeInfo[1], BlockAccessToken.DUMMY_TOKEN);
+        new DatanodeInfo[1], BlockTokenSecretManager.DUMMY_TOKEN);
     sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
     
     // bad bytes per checksum
@@ -370,7 +370,7 @@ public class TestDataTransferProtocol ex
     DataTransferProtocol.Sender.opWriteBlock(sendOut,
         ++newBlockId, 0L, 0,
         BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, 0L, 0L, "cl", null,
-        new DatanodeInfo[1], BlockAccessToken.DUMMY_TOKEN);
+        new DatanodeInfo[1], BlockTokenSecretManager.DUMMY_TOKEN);
     sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
     sendOut.writeInt(512);
     sendOut.writeInt(4);           // size of packet
@@ -392,7 +392,7 @@ public class TestDataTransferProtocol ex
     DataTransferProtocol.Sender.opWriteBlock(sendOut, 
         ++newBlockId, 0L, 0,
         BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, 0L, 0L, "cl", null,
-        new DatanodeInfo[1], BlockAccessToken.DUMMY_TOKEN);
+        new DatanodeInfo[1], BlockTokenSecretManager.DUMMY_TOKEN);
     sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
     sendOut.writeInt(512);         // checksum size
     sendOut.writeInt(8);           // size of packet
@@ -423,7 +423,7 @@ public class TestDataTransferProtocol ex
     sendOut.writeLong(fileLen);
     ERROR.write(recvOut);
     Text.writeString(sendOut, "cl");
-    BlockAccessToken.DUMMY_TOKEN.write(sendOut);
+    BlockTokenSecretManager.DUMMY_TOKEN.write(sendOut);
     sendRecvData("Wrong block ID " + newBlockId + " for read", false); 
 
     // negative block start offset
@@ -435,7 +435,7 @@ public class TestDataTransferProtocol ex
     sendOut.writeLong(-1L);
     sendOut.writeLong(fileLen);
     Text.writeString(sendOut, "cl");
-    BlockAccessToken.DUMMY_TOKEN.write(sendOut);
+    BlockTokenSecretManager.DUMMY_TOKEN.write(sendOut);
     sendRecvData("Negative start-offset for read for block " + 
                  firstBlock.getBlockId(), false);
 
@@ -448,7 +448,7 @@ public class TestDataTransferProtocol ex
     sendOut.writeLong(fileLen);
     sendOut.writeLong(fileLen);
     Text.writeString(sendOut, "cl");
-    BlockAccessToken.DUMMY_TOKEN.write(sendOut);
+    BlockTokenSecretManager.DUMMY_TOKEN.write(sendOut);
     sendRecvData("Wrong start-offset for reading block " +
                  firstBlock.getBlockId(), false);
     
@@ -463,7 +463,7 @@ public class TestDataTransferProtocol ex
     sendOut.writeLong(0);
     sendOut.writeLong(-1-random.nextInt(oneMil));
     Text.writeString(sendOut, "cl");
-    BlockAccessToken.DUMMY_TOKEN.write(sendOut);
+    BlockTokenSecretManager.DUMMY_TOKEN.write(sendOut);
     sendRecvData("Negative length for reading block " +
                  firstBlock.getBlockId(), false);
     
@@ -478,7 +478,7 @@ public class TestDataTransferProtocol ex
     sendOut.writeLong(0);
     sendOut.writeLong(fileLen + 1);
     Text.writeString(sendOut, "cl");
-    BlockAccessToken.DUMMY_TOKEN.write(sendOut);
+    BlockTokenSecretManager.DUMMY_TOKEN.write(sendOut);
     sendRecvData("Wrong length for reading block " +
                  firstBlock.getBlockId(), false);
     
@@ -491,7 +491,7 @@ public class TestDataTransferProtocol ex
     sendOut.writeLong(0);
     sendOut.writeLong(fileLen);
     Text.writeString(sendOut, "cl");
-    BlockAccessToken.DUMMY_TOKEN.write(sendOut);
+    BlockTokenSecretManager.DUMMY_TOKEN.write(sendOut);
     readFile(fileSys, file, fileLen);
     } finally {
       cluster.shutdown();

Added: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/security/token/block/SecurityTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/security/token/block/SecurityTestUtil.java?rev=948634&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/security/token/block/SecurityTestUtil.java (added)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/security/token/block/SecurityTestUtil.java Wed May 26 23:31:56 2010
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.security.token.block;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
+import org.apache.hadoop.security.token.Token;
+
+/** Utilities for security tests */
+public class SecurityTestUtil {
+
+  /**
+   * check if an access token is expired. return true when token is expired,
+   * false otherwise
+   */
+  public static boolean isBlockTokenExpired(Token<BlockTokenIdentifier> token)
+      throws IOException {
+    return BlockTokenSecretManager.isTokenExpired(token);
+  }
+
+  /**
+   * set access token lifetime.
+   */
+  public static void setBlockTokenLifetime(BlockTokenSecretManager handler,
+      long tokenLifetime) {
+    handler.setTokenLifetime(tokenLifetime);
+  }
+
+}

Added: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java?rev=948634&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java (added)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java Wed May 26 23:31:56 2010
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.security.token.block;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.EnumSet;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.io.TestWritable;
+import org.apache.hadoop.ipc.Client;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SaslInputStream;
+import org.apache.hadoop.security.SaslRpcClient;
+import org.apache.hadoop.security.SaslRpcServer;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.log4j.Level;
+
+import org.junit.Test;
+
+import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION;
+import static org.junit.Assert.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+/** Unit tests for block tokens */
+public class TestBlockToken {
+  public static final Log LOG = LogFactory.getLog(TestBlockToken.class);
+  private static final String ADDRESS = "0.0.0.0";
+
+  static final String SERVER_PRINCIPAL_KEY = "test.ipc.server.principal";
+  private static Configuration conf;
+  static {
+    conf = new Configuration();
+    conf.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos");
+    UserGroupInformation.setConfiguration(conf);
+  }
+
+  static {
+    ((Log4JLogger) Client.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger) Server.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger) SaslRpcClient.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger) SaslRpcServer.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger) SaslInputStream.LOG).getLogger().setLevel(Level.ALL);
+  }
+
+  long blockKeyUpdateInterval = 10 * 60 * 1000; // 10 mins
+  long blockTokenLifetime = 2 * 60 * 1000; // 2 mins
+  Block block1 = new Block(0L);
+  Block block2 = new Block(10L);
+  Block block3 = new Block(-108L);
+
+  private static class getLengthAnswer implements Answer<Long> {
+    BlockTokenSecretManager sm;
+    BlockTokenIdentifier ident;
+
+    public getLengthAnswer(BlockTokenSecretManager sm,
+        BlockTokenIdentifier ident) {
+      this.sm = sm;
+      this.ident = ident;
+    }
+
+    @Override
+    public Long answer(InvocationOnMock invocation) throws IOException {
+      Object args[] = invocation.getArguments();
+      assertEquals(1, args.length);
+      Block block = (Block) args[0];
+      Set<TokenIdentifier> tokenIds = UserGroupInformation.getCurrentUser()
+          .getTokenIdentifiers();
+      assertEquals("Only one BlockTokenIdentifier expected", 1, tokenIds.size());
+      long result = 0;
+      for (TokenIdentifier tokenId : tokenIds) {
+        BlockTokenIdentifier id = (BlockTokenIdentifier) tokenId;
+        LOG.info("Got: " + id.toString());
+        assertTrue("Received BlockTokenIdentifier is wrong", ident.equals(id));
+        sm.checkAccess(id, null, block, BlockTokenSecretManager.AccessMode.WRITE);
+        result = id.getBlockId();
+      }
+      return result;
+    }
+  }
+
+  private BlockTokenIdentifier generateTokenId(BlockTokenSecretManager sm,
+      Block block, EnumSet<BlockTokenSecretManager.AccessMode> accessModes)
+      throws IOException {
+    Token<BlockTokenIdentifier> token = sm.generateToken(block, accessModes);
+    BlockTokenIdentifier id = sm.createIdentifier();
+    id.readFields(new DataInputStream(new ByteArrayInputStream(token
+        .getIdentifier())));
+    return id;
+  }
+
+  @Test
+  public void testWritable() throws Exception {
+    TestWritable.testWritable(new BlockTokenIdentifier());
+    BlockTokenSecretManager sm = new BlockTokenSecretManager(true,
+        blockKeyUpdateInterval, blockTokenLifetime);
+    TestWritable.testWritable(generateTokenId(sm, block1, EnumSet
+        .allOf(BlockTokenSecretManager.AccessMode.class)));
+    TestWritable.testWritable(generateTokenId(sm, block2, EnumSet
+        .of(BlockTokenSecretManager.AccessMode.WRITE)));
+    TestWritable.testWritable(generateTokenId(sm, block3, EnumSet
+        .noneOf(BlockTokenSecretManager.AccessMode.class)));
+  }
+
+  private void tokenGenerationAndVerification(BlockTokenSecretManager master,
+      BlockTokenSecretManager slave) throws Exception {
+    // single-mode tokens
+    for (BlockTokenSecretManager.AccessMode mode : BlockTokenSecretManager.AccessMode
+        .values()) {
+      // generated by master
+      Token<BlockTokenIdentifier> token1 = master.generateToken(block1,
+          EnumSet.of(mode));
+      master.checkAccess(token1, null, block1, mode);
+      slave.checkAccess(token1, null, block1, mode);
+      // generated by slave
+      Token<BlockTokenIdentifier> token2 = slave.generateToken(block2,
+          EnumSet.of(mode));
+      master.checkAccess(token2, null, block2, mode);
+      slave.checkAccess(token2, null, block2, mode);
+    }
+    // multi-mode tokens
+    Token<BlockTokenIdentifier> mtoken = master.generateToken(block3, EnumSet
+        .allOf(BlockTokenSecretManager.AccessMode.class));
+    for (BlockTokenSecretManager.AccessMode mode : BlockTokenSecretManager.AccessMode
+        .values()) {
+      master.checkAccess(mtoken, null, block3, mode);
+      slave.checkAccess(mtoken, null, block3, mode);
+    }
+  }
+
+  /** test block key and token handling */
+  @Test
+  public void testBlockTokenSecretManager() throws Exception {
+    BlockTokenSecretManager masterHandler = new BlockTokenSecretManager(true,
+        blockKeyUpdateInterval, blockTokenLifetime);
+    BlockTokenSecretManager slaveHandler = new BlockTokenSecretManager(false,
+        blockKeyUpdateInterval, blockTokenLifetime);
+    ExportedBlockKeys keys = masterHandler.exportKeys();
+    slaveHandler.setKeys(keys);
+    tokenGenerationAndVerification(masterHandler, slaveHandler);
+    // key updating
+    masterHandler.updateKeys();
+    tokenGenerationAndVerification(masterHandler, slaveHandler);
+    keys = masterHandler.exportKeys();
+    slaveHandler.setKeys(keys);
+    tokenGenerationAndVerification(masterHandler, slaveHandler);
+  }
+
+  @Test
+  public void testBlockTokenRpc() throws Exception {
+    BlockTokenSecretManager sm = new BlockTokenSecretManager(true,
+        blockKeyUpdateInterval, blockTokenLifetime);
+    Token<BlockTokenIdentifier> token = sm.generateToken(block3,
+        EnumSet.allOf(BlockTokenSecretManager.AccessMode.class));
+
+    ClientDatanodeProtocol mockDN = mock(ClientDatanodeProtocol.class);
+    when(mockDN.getProtocolVersion(anyString(), anyLong())).thenReturn(
+        ClientDatanodeProtocol.versionID);
+    BlockTokenIdentifier id = sm.createIdentifier();
+    id.readFields(new DataInputStream(new ByteArrayInputStream(token
+        .getIdentifier())));
+    doAnswer(new getLengthAnswer(sm, id)).when(mockDN).getReplicaVisibleLength(
+        any(Block.class));
+
+    final Server server = RPC.getServer(ClientDatanodeProtocol.class, mockDN,
+        ADDRESS, 0, 5, true, conf, sm);
+
+    server.start();
+
+    final InetSocketAddress addr = NetUtils.getConnectAddress(server);
+    final UserGroupInformation ticket = UserGroupInformation
+        .createRemoteUser(block3.toString());
+    ticket.addToken(token);
+
+    ClientDatanodeProtocol proxy = null;
+    try {
+      proxy = (ClientDatanodeProtocol) RPC.getProxy(
+          ClientDatanodeProtocol.class, ClientDatanodeProtocol.versionID, addr,
+          ticket, conf, NetUtils.getDefaultSocketFactory(conf));
+      assertEquals(block3.getBlockId(), proxy.getReplicaVisibleLength(block3));
+    } finally {
+      server.stop();
+      if (proxy != null) {
+        RPC.stopProxy(proxy);
+      }
+    }
+  }
+
+}

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java?rev=948634&r1=948633&r2=948634&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java Wed May 26 23:31:56 2010
@@ -46,7 +46,7 @@ import org.apache.hadoop.hdfs.protocol.D
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
-import org.apache.hadoop.hdfs.security.BlockAccessToken;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.Util;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -236,7 +236,7 @@ public class TestBlockReplacement extend
     out.writeLong(block.getGenerationStamp());
     Text.writeString(out, source.getStorageID());
     sourceProxy.write(out);
-    BlockAccessToken.DUMMY_TOKEN.write(out);
+    BlockTokenSecretManager.DUMMY_TOKEN.write(out);
     out.flush();
     // receiveResponse
     DataInputStream reply = new DataInputStream(sock.getInputStream());

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java?rev=948634&r1=948633&r2=948634&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java Wed May 26 23:31:56 2010
@@ -249,7 +249,7 @@ public class TestDataNodeVolumeFailure e
       BlockReader.newBlockReader(s, targetAddr.toString() + ":" + 
           block.getBlockId(), 
           block.getBlockId(), 
-          lblock.getAccessToken(),
+          lblock.getBlockToken(),
           block.getGenerationStamp(), 
           0, -1, 4096);
 

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java?rev=948634&r1=948633&r2=948634&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java Wed May 26 23:31:56 2010
@@ -36,7 +36,7 @@ import org.apache.hadoop.hdfs.protocol.L
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Sender;
-import org.apache.hadoop.hdfs.security.BlockAccessToken;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -121,7 +121,7 @@ public class TestDiskError extends TestC
           block.getBlock().getGenerationStamp(), 1, 
           BlockConstructionStage.PIPELINE_SETUP_CREATE, 
           0L, 0L, 0L, "", null, new DatanodeInfo[0], 
-          BlockAccessToken.DUMMY_TOKEN);
+          BlockTokenSecretManager.DUMMY_TOKEN);
 
       // write check header
       out.writeByte( 1 );

Added: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlockTokenWithDFS.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlockTokenWithDFS.java?rev=948634&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlockTokenWithDFS.java (added)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlockTokenWithDFS.java Wed May 26 23:31:56 2010
@@ -0,0 +1,532 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.BlockReader;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.security.token.block.*;
+import org.apache.hadoop.hdfs.server.balancer.TestBalancer;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.token.*;
+import org.apache.log4j.Level;
+
+import junit.framework.TestCase;
+
+public class TestBlockTokenWithDFS extends TestCase {
+
+  private static final int BLOCK_SIZE = 1024;
+  private static final int FILE_SIZE = 2 * BLOCK_SIZE;
+  private static final String FILE_TO_READ = "/fileToRead.dat";
+  private static final String FILE_TO_WRITE = "/fileToWrite.dat";
+  private static final String FILE_TO_APPEND = "/fileToAppend.dat";
+  private final byte[] rawData = new byte[FILE_SIZE];
+
+  {
+    ((Log4JLogger) DFSClient.LOG).getLogger().setLevel(Level.ALL);
+    Random r = new Random();
+    r.nextBytes(rawData);
+  }
+
+  private void createFile(FileSystem fs, Path filename) throws IOException {
+    FSDataOutputStream out = fs.create(filename);
+    out.write(rawData);
+    out.close();
+  }
+
+  // read a file using blockSeekTo()
+  private boolean checkFile1(FSDataInputStream in) {
+    byte[] toRead = new byte[FILE_SIZE];
+    int totalRead = 0;
+    int nRead = 0;
+    try {
+      while ((nRead = in.read(toRead, totalRead, toRead.length - totalRead)) > 0) {
+        totalRead += nRead;
+      }
+    } catch (IOException e) {
+      return false;
+    }
+    assertEquals("Cannot read file.", toRead.length, totalRead);
+    return checkFile(toRead);
+  }
+
+  // read a file using fetchBlockByteRange()
+  private boolean checkFile2(FSDataInputStream in) {
+    byte[] toRead = new byte[FILE_SIZE];
+    try {
+      assertEquals("Cannot read file", toRead.length, in.read(0, toRead, 0,
+          toRead.length));
+    } catch (IOException e) {
+      return false;
+    }
+    return checkFile(toRead);
+  }
+
+  private boolean checkFile(byte[] fileToCheck) {
+    if (fileToCheck.length != rawData.length) {
+      return false;
+    }
+    for (int i = 0; i < fileToCheck.length; i++) {
+      if (fileToCheck[i] != rawData[i]) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  // creates a file and returns a descriptor for writing to it
+  private static FSDataOutputStream writeFile(FileSystem fileSys, Path name,
+      short repl, long blockSize) throws IOException {
+    FSDataOutputStream stm = fileSys.create(name, true, fileSys.getConf()
+        .getInt("io.file.buffer.size", 4096), repl, blockSize);
+    return stm;
+  }
+
+  // try reading a block using a BlockReader directly
+  private static void tryRead(Configuration conf, LocatedBlock lblock,
+      boolean shouldSucceed) {
+    InetSocketAddress targetAddr = null;
+    Socket s = null;
+    BlockReader blockReader = null;
+    Block block = lblock.getBlock();
+    try {
+      DatanodeInfo[] nodes = lblock.getLocations();
+      targetAddr = NetUtils.createSocketAddr(nodes[0].getName());
+      s = new Socket();
+      s.connect(targetAddr, HdfsConstants.READ_TIMEOUT);
+      s.setSoTimeout(HdfsConstants.READ_TIMEOUT);
+
+      blockReader = BlockReader.newBlockReader(s, targetAddr
+          .toString()
+          + ":" + block.getBlockId(), block.getBlockId(), lblock
+          .getBlockToken(), block.getGenerationStamp(), 0, -1, conf.getInt(
+          "io.file.buffer.size", 4096));
+
+    } catch (IOException ex) {
+      if (ex instanceof InvalidBlockTokenException) {
+        assertFalse("OP_READ_BLOCK: access token is invalid, "
+            + "when it is expected to be valid", shouldSucceed);
+        return;
+      }
+      fail("OP_READ_BLOCK failed due to reasons other than access token");
+    } finally {
+      if (s != null) {
+        try {
+          s.close();
+        } catch (IOException iex) {
+        } finally {
+          s = null;
+        }
+      }
+    }
+    if (blockReader == null) {
+      fail("OP_READ_BLOCK failed due to reasons other than access token");
+    }
+    assertTrue("OP_READ_BLOCK: access token is valid, "
+        + "when it is expected to be invalid", shouldSucceed);
+  }
+
+  // get a conf for testing
+  private static Configuration getConf(int numDataNodes) throws IOException {
+    Configuration conf = new Configuration();
+    conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
+    conf.setLong("dfs.block.size", BLOCK_SIZE);
+    conf.setInt("io.bytes.per.checksum", BLOCK_SIZE);
+    conf.setInt("dfs.heartbeat.interval", 1);
+    conf.setInt("dfs.replication", numDataNodes);
+    conf.setInt("ipc.client.connect.max.retries", 0);
+    conf.setBoolean("dfs.support.append", true);
+    return conf;
+  }
+
+  /*
+   * testing that APPEND operation can handle token expiration when
+   * re-establishing pipeline is needed
+   */
+  public void testAppend() throws Exception {
+    MiniDFSCluster cluster = null;
+    int numDataNodes = 2;
+    Configuration conf = getConf(numDataNodes);
+
+    try {
+      cluster = new MiniDFSCluster(conf, numDataNodes, true, null);
+      cluster.waitActive();
+      assertEquals(numDataNodes, cluster.getDataNodes().size());
+      // set a short token lifetime (1 second)
+      SecurityTestUtil.setBlockTokenLifetime(
+          cluster.getNameNode().getNamesystem().blockTokenSecretManager, 1000L);
+      Path fileToAppend = new Path(FILE_TO_APPEND);
+      FileSystem fs = cluster.getFileSystem();
+
+      // write a one-byte file
+      FSDataOutputStream stm = writeFile(fs, fileToAppend,
+          (short) numDataNodes, BLOCK_SIZE);
+      stm.write(rawData, 0, 1);
+      stm.close();
+      // open the file again for append
+      stm = fs.append(fileToAppend);
+      int mid = rawData.length - 1;
+      stm.write(rawData, 1, mid - 1);
+      stm.sync();
+
+      /*
+       * wait till token used in stm expires
+       */
+      Token<BlockTokenIdentifier> token = DFSTestUtil.getBlockToken(stm);
+      while (!SecurityTestUtil.isBlockTokenExpired(token)) {
+        try {
+          Thread.sleep(10);
+        } catch (InterruptedException ignored) {
+        }
+      }
+
+      // remove a datanode to force re-establishing pipeline
+      cluster.stopDataNode(0);
+      // append the rest of the file
+      stm.write(rawData, mid, rawData.length - mid);
+      stm.close();
+      // check if append is successful
+      FSDataInputStream in5 = fs.open(fileToAppend);
+      assertTrue(checkFile1(in5));
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
+  /*
+   * testing that WRITE operation can handle token expiration when
+   * re-establishing pipeline is needed
+   */
+  public void testWrite() throws Exception {
+    MiniDFSCluster cluster = null;
+    int numDataNodes = 2;
+    Configuration conf = getConf(numDataNodes);
+
+    try {
+      cluster = new MiniDFSCluster(conf, numDataNodes, true, null);
+      cluster.waitActive();
+      assertEquals(numDataNodes, cluster.getDataNodes().size());
+      // set a short token lifetime (1 second)
+      SecurityTestUtil.setBlockTokenLifetime(
+          cluster.getNameNode().getNamesystem().blockTokenSecretManager, 1000L);
+      Path fileToWrite = new Path(FILE_TO_WRITE);
+      FileSystem fs = cluster.getFileSystem();
+
+      FSDataOutputStream stm = writeFile(fs, fileToWrite, (short) numDataNodes,
+          BLOCK_SIZE);
+      // write a partial block
+      int mid = rawData.length - 1;
+      stm.write(rawData, 0, mid);
+      stm.sync();
+
+      /*
+       * wait till token used in stm expires
+       */
+      Token<BlockTokenIdentifier> token = DFSTestUtil.getBlockToken(stm);
+      while (!SecurityTestUtil.isBlockTokenExpired(token)) {
+        try {
+          Thread.sleep(10);
+        } catch (InterruptedException ignored) {
+        }
+      }
+
+      // remove a datanode to force re-establishing pipeline
+      cluster.stopDataNode(0);
+      // write the rest of the file
+      stm.write(rawData, mid, rawData.length - mid);
+      stm.close();
+      // check if write is successful
+      FSDataInputStream in4 = fs.open(fileToWrite);
+      assertTrue(checkFile1(in4));
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
+  public void testRead() throws Exception {
+    MiniDFSCluster cluster = null;
+    int numDataNodes = 2;
+    Configuration conf = getConf(numDataNodes);
+
+    try {
+      cluster = new MiniDFSCluster(conf, numDataNodes, true, null);
+      cluster.waitActive();
+      assertEquals(numDataNodes, cluster.getDataNodes().size());
+      // set a short token lifetime (1 second) initially
+      SecurityTestUtil.setBlockTokenLifetime(
+          cluster.getNameNode().getNamesystem().blockTokenSecretManager, 1000L);
+      Path fileToRead = new Path(FILE_TO_READ);
+      FileSystem fs = cluster.getFileSystem();
+      createFile(fs, fileToRead);
+
+      /*
+       * setup for testing expiration handling of cached tokens
+       */
+
+      // read using blockSeekTo(). Acquired tokens are cached in in1
+      FSDataInputStream in1 = fs.open(fileToRead);
+      assertTrue(checkFile1(in1));
+      // read using blockSeekTo(). Acquired tokens are cached in in2
+      FSDataInputStream in2 = fs.open(fileToRead);
+      assertTrue(checkFile1(in2));
+      // read using fetchBlockByteRange(). Acquired tokens are cached in in3
+      FSDataInputStream in3 = fs.open(fileToRead);
+      assertTrue(checkFile2(in3));
+
+      /*
+       * testing READ interface on DN using a BlockReader
+       */
+
+      DFSClient dfsclient = new DFSClient(new InetSocketAddress("localhost",
+          cluster.getNameNodePort()), conf);
+      List<LocatedBlock> locatedBlocks = cluster.getNameNode().getBlockLocations(
+          FILE_TO_READ, 0, FILE_SIZE).getLocatedBlocks();
+      LocatedBlock lblock = locatedBlocks.get(0); // first block
+      Token<BlockTokenIdentifier> myToken = lblock.getBlockToken();
+      // verify token is not expired
+      assertFalse(SecurityTestUtil.isBlockTokenExpired(myToken));
+      // read with valid token, should succeed
+      tryRead(conf, lblock, true);
+
+      /*
+       * wait till myToken and all cached tokens in in1, in2 and in3 expire
+       */
+
+      while (!SecurityTestUtil.isBlockTokenExpired(myToken)) {
+        try {
+          Thread.sleep(10);
+        } catch (InterruptedException ignored) {
+        }
+      }
+
+      /*
+       * continue testing READ interface on DN using a BlockReader
+       */
+
+      // verify token is expired
+      assertTrue(SecurityTestUtil.isBlockTokenExpired(myToken));
+      // read should fail
+      tryRead(conf, lblock, false);
+      // use a valid new token
+      lblock.setBlockToken(cluster.getNameNode().getNamesystem()
+          .blockTokenSecretManager.generateToken(lblock.getBlock(),
+              EnumSet.of(BlockTokenSecretManager.AccessMode.READ)));
+      // read should succeed
+      tryRead(conf, lblock, true);
+      // use a token with wrong blockID
+      Block wrongBlock = new Block(lblock.getBlock().getBlockId() + 1);
+      lblock.setBlockToken(cluster.getNameNode().getNamesystem()
+          .blockTokenSecretManager.generateToken(wrongBlock,
+              EnumSet.of(BlockTokenSecretManager.AccessMode.READ)));
+      // read should fail
+      tryRead(conf, lblock, false);
+      // use a token with wrong access modes
+      lblock.setBlockToken(cluster.getNameNode().getNamesystem()
+          .blockTokenSecretManager.generateToken(lblock.getBlock(), EnumSet.of(
+              BlockTokenSecretManager.AccessMode.WRITE,
+              BlockTokenSecretManager.AccessMode.COPY,
+              BlockTokenSecretManager.AccessMode.REPLACE)));
+      // read should fail
+      tryRead(conf, lblock, false);
+
+      // set a long token lifetime for future tokens
+      SecurityTestUtil.setBlockTokenLifetime(
+          cluster.getNameNode().getNamesystem().blockTokenSecretManager, 600 * 1000L);
+
+      /*
+       * testing that when cached tokens are expired, DFSClient will re-fetch
+       * tokens transparently for READ.
+       */
+
+      // confirm all tokens cached in in1 are expired by now
+      List<LocatedBlock> lblocks = DFSTestUtil.getAllBlocks(in1);
+      for (LocatedBlock blk : lblocks) {
+        assertTrue(SecurityTestUtil.isBlockTokenExpired(blk.getBlockToken()));
+      }
+      // verify blockSeekTo() is able to re-fetch token transparently
+      in1.seek(0);
+      assertTrue(checkFile1(in1));
+
+      // confirm all tokens cached in in2 are expired by now
+      List<LocatedBlock> lblocks2 = DFSTestUtil.getAllBlocks(in2);
+      for (LocatedBlock blk : lblocks2) {
+        assertTrue(SecurityTestUtil.isBlockTokenExpired(blk.getBlockToken()));
+      }
+      // verify blockSeekTo() is able to re-fetch token transparently (testing
+      // via another interface method)
+      assertTrue(in2.seekToNewSource(0));
+      assertTrue(checkFile1(in2));
+
+      // confirm all tokens cached in in3 are expired by now
+      List<LocatedBlock> lblocks3 = DFSTestUtil.getAllBlocks(in3);
+      for (LocatedBlock blk : lblocks3) {
+        assertTrue(SecurityTestUtil.isBlockTokenExpired(blk.getBlockToken()));
+      }
+      // verify fetchBlockByteRange() is able to re-fetch token transparently
+      assertTrue(checkFile2(in3));
+
+      /*
+       * testing that after datanodes are restarted on the same ports, cached
+       * tokens should still work and there is no need to fetch new tokens from
+       * namenode. This test should run while namenode is down (to make sure no
+       * new tokens can be fetched from namenode).
+       */
+
+      // restart datanodes on the same ports that they currently use
+      assertTrue(cluster.restartDataNodes(true));
+      cluster.waitActive();
+      assertEquals(numDataNodes, cluster.getDataNodes().size());
+      cluster.shutdownNameNode();
+
+      // confirm tokens cached in in1 are still valid
+      lblocks = DFSTestUtil.getAllBlocks(in1);
+      for (LocatedBlock blk : lblocks) {
+        assertFalse(SecurityTestUtil.isBlockTokenExpired(blk.getBlockToken()));
+      }
+      // verify blockSeekTo() still works (forced to use cached tokens)
+      in1.seek(0);
+      assertTrue(checkFile1(in1));
+
+      // confirm tokens cached in in2 are still valid
+      lblocks2 = DFSTestUtil.getAllBlocks(in2);
+      for (LocatedBlock blk : lblocks2) {
+        assertFalse(SecurityTestUtil.isBlockTokenExpired(blk.getBlockToken()));
+      }
+      // verify blockSeekTo() still works (forced to use cached tokens)
+      in2.seekToNewSource(0);
+      assertTrue(checkFile1(in2));
+
+      // confirm tokens cached in in3 are still valid
+      lblocks3 = DFSTestUtil.getAllBlocks(in3);
+      for (LocatedBlock blk : lblocks3) {
+        assertFalse(SecurityTestUtil.isBlockTokenExpired(blk.getBlockToken()));
+      }
+      // verify fetchBlockByteRange() still works (forced to use cached tokens)
+      assertTrue(checkFile2(in3));
+
+      /*
+       * testing that when namenode is restarted, cached tokens should still
+       * work and there is no need to fetch new tokens from namenode. Like the
+       * previous test, this test should also run while namenode is down. The
+       * setup for this test depends on the previous test.
+       */
+
+      // restart the namenode and then shut it down for test
+      cluster.restartNameNode();
+      cluster.shutdownNameNode();
+
+      // verify blockSeekTo() still works (forced to use cached tokens)
+      in1.seek(0);
+      assertTrue(checkFile1(in1));
+      // verify again blockSeekTo() still works (forced to use cached tokens)
+      in2.seekToNewSource(0);
+      assertTrue(checkFile1(in2));
+      // verify fetchBlockByteRange() still works (forced to use cached tokens)
+      assertTrue(checkFile2(in3));
+
+      /*
+       * testing that after both namenode and datanodes got restarted (namenode
+       * first, followed by datanodes), DFSClient can't access DN without
+       * re-fetching tokens and is able to re-fetch tokens transparently. The
+       * setup of this test depends on the previous test.
+       */
+
+      // restore the cluster and restart the datanodes for test
+      cluster.restartNameNode();
+      assertTrue(cluster.restartDataNodes(true));
+      cluster.waitActive();
+      assertEquals(numDataNodes, cluster.getDataNodes().size());
+
+      // shutdown namenode so that DFSClient can't get new tokens from namenode
+      cluster.shutdownNameNode();
+
+      // verify blockSeekTo() fails (cached tokens become invalid)
+      in1.seek(0);
+      assertFalse(checkFile1(in1));
+      // verify fetchBlockByteRange() fails (cached tokens become invalid)
+      assertFalse(checkFile2(in3));
+
+      // restart the namenode to allow DFSClient to re-fetch tokens
+      cluster.restartNameNode();
+      // verify blockSeekTo() works again (by transparently re-fetching
+      // tokens from namenode)
+      in1.seek(0);
+      assertTrue(checkFile1(in1));
+      in2.seekToNewSource(0);
+      assertTrue(checkFile1(in2));
+      // verify fetchBlockByteRange() works again (by transparently
+      // re-fetching tokens from namenode)
+      assertTrue(checkFile2(in3));
+
+      /*
+       * testing that when datanodes are restarted on different ports, DFSClient
+       * is able to re-fetch tokens transparently to connect to them
+       */
+
+      // restart datanodes on newly assigned ports
+      assertTrue(cluster.restartDataNodes(false));
+      cluster.waitActive();
+      assertEquals(numDataNodes, cluster.getDataNodes().size());
+      // verify blockSeekTo() is able to re-fetch token transparently
+      in1.seek(0);
+      assertTrue(checkFile1(in1));
+      // verify blockSeekTo() is able to re-fetch token transparently
+      in2.seekToNewSource(0);
+      assertTrue(checkFile1(in2));
+      // verify fetchBlockByteRange() is able to re-fetch token transparently
+      assertTrue(checkFile2(in3));
+
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
+  /*
+   * Integration testing of access token, involving NN, DN, and Balancer
+   */
+  public void testEnd2End() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
+    new TestBalancer().integrationTest(conf);
+  }
+}