You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by jg...@apache.org on 2010/05/27 01:31:57 UTC

svn commit: r948634 [1/2] - in /hadoop/hdfs/trunk: ./ ivy/ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/ src/java/org/apache/hadoop/hdfs/security/ src/java/org/apache/hadoop/hdfs/security/token/block/ src/java/org/apache/ha...

Author: jghoman
Date: Wed May 26 23:31:56 2010
New Revision: 948634

URL: http://svn.apache.org/viewvc?rev=948634&view=rev
Log:
HDFS-992. Re-factor block access token implementation to conform to the generic Token interface in Common (Kan Zhang and Jitendra Pandey via jghoman)

Added:
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/security/token/block/
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/security/token/block/BlockKey.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSelector.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/security/token/block/ExportedBlockKeys.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/security/token/block/InvalidBlockTokenException.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/security/token/
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/security/token/block/
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/security/token/block/SecurityTestUtil.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlockTokenWithDFS.java
Removed:
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/security/AccessTokenHandler.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/security/BlockAccessKey.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/security/BlockAccessToken.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/security/ExportedAccessKeys.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/security/InvalidAccessTokenException.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/security/SecurityTestUtil.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/security/TestAccessToken.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestAccessTokenWithDFS.java
Modified:
    hadoop/hdfs/trunk/CHANGES.txt
    hadoop/hdfs/trunk/ivy/libraries.properties
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/BlockReader.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSInputStream.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/protocol/KeyUpdateCommand.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java

Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=948634&r1=948633&r2=948634&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Wed May 26 23:31:56 2010
@@ -2,6 +2,11 @@ Hadoop HDFS Change Log
 
 Trunk (unreleased changes)
 
+  NEW FEATURES
+
+    HDFS-992. Re-factor block access token implementation to conform to the 
+    generic Token interface in Common (Kan Zhang and Jitendra Pandey via jghoman)
+
   IMPROVEMENTS
 
     HDFS-1132. Refactor TestFileStatus (Eli Collins via cos)

Modified: hadoop/hdfs/trunk/ivy/libraries.properties
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/ivy/libraries.properties?rev=948634&r1=948633&r2=948634&view=diff
==============================================================================
--- hadoop/hdfs/trunk/ivy/libraries.properties (original)
+++ hadoop/hdfs/trunk/ivy/libraries.properties Wed May 26 23:31:56 2010
@@ -16,7 +16,7 @@
 #These are the versions of our dependencies (in alphabetical order)
 apacheant.version=1.7.1
 ant-task.version=2.0.10
-avro.version=1.3.1
+avro.version=1.3.2
 
 checkstyle.version=4.2
 

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/BlockReader.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/BlockReader.java?rev=948634&r1=948633&r2=948634&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/BlockReader.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/BlockReader.java Wed May 26 23:31:56 2010
@@ -33,11 +33,12 @@ import java.nio.ByteBuffer;
 import org.apache.hadoop.fs.FSInputChecker;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
-import org.apache.hadoop.hdfs.security.BlockAccessToken;
-import org.apache.hadoop.hdfs.security.InvalidAccessTokenException;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.DataChecksum;
 
 /** This is a wrapper around connection to datadone
@@ -353,26 +354,27 @@ public class BlockReader extends FSInput
     checksumSize = this.checksum.getChecksumSize();
   }
 
-  public static BlockReader newBlockReader(Socket sock, String file, long blockId, BlockAccessToken accessToken, 
-      long genStamp, long startOffset, long len, int bufferSize) throws IOException {
-    return newBlockReader(sock, file, blockId, accessToken, genStamp, startOffset, len, bufferSize,
+  public static BlockReader newBlockReader(Socket sock, String file,
+      long blockId, Token<BlockTokenIdentifier> blockToken, long genStamp,
+      long startOffset, long len, int bufferSize) throws IOException {
+    return newBlockReader(sock, file, blockId, blockToken, genStamp, startOffset, len, bufferSize,
         true);
   }
 
   /** Java Doc required */
   public static BlockReader newBlockReader( Socket sock, String file, long blockId, 
-                                     BlockAccessToken accessToken,
+                                     Token<BlockTokenIdentifier> blockToken,
                                      long genStamp,
                                      long startOffset, long len,
                                      int bufferSize, boolean verifyChecksum)
                                      throws IOException {
-    return newBlockReader(sock, file, blockId, accessToken, genStamp, startOffset,
+    return newBlockReader(sock, file, blockId, blockToken, genStamp, startOffset,
                           len, bufferSize, verifyChecksum, "");
   }
 
   public static BlockReader newBlockReader( Socket sock, String file,
                                      long blockId, 
-                                     BlockAccessToken accessToken,
+                                     Token<BlockTokenIdentifier> blockToken,
                                      long genStamp,
                                      long startOffset, long len,
                                      int bufferSize, boolean verifyChecksum,
@@ -382,7 +384,7 @@ public class BlockReader extends FSInput
     DataTransferProtocol.Sender.opReadBlock(
         new DataOutputStream(new BufferedOutputStream(
             NetUtils.getOutputStream(sock,HdfsConstants.WRITE_TIMEOUT))),
-        blockId, genStamp, startOffset, len, clientName, accessToken);
+        blockId, genStamp, startOffset, len, clientName, blockToken);
     
     //
     // Get bytes in block, set streams
@@ -395,7 +397,7 @@ public class BlockReader extends FSInput
     DataTransferProtocol.Status status = DataTransferProtocol.Status.read(in);
     if (status != SUCCESS) {
       if (status == ERROR_ACCESS_TOKEN) {
-        throw new InvalidAccessTokenException(
+        throw new InvalidBlockTokenException(
             "Got access token error for OP_READ_BLOCK, self="
                 + sock.getLocalSocketAddress() + ", remote="
                 + sock.getRemoteSocketAddress() + ", for file " + file

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java?rev=948634&r1=948633&r2=948634&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java Wed May 26 23:31:56 2010
@@ -101,6 +101,7 @@ import org.apache.hadoop.hdfs.security.t
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 
 /********************************************************
  * DFSClient can connect to a Hadoop Filesystem and 
@@ -183,15 +184,20 @@ public class DFSClient implements FSCons
         rpcNamenode, methodNameToPolicyMap);
   }
 
-  static ClientDatanodeProtocol createClientDatanodeProtocolProxy (
-      DatanodeID datanodeid, Configuration conf) throws IOException {
+  static ClientDatanodeProtocol createClientDatanodeProtocolProxy(
+      DatanodeID datanodeid, Configuration conf, LocatedBlock locatedBlock)
+      throws IOException {
     InetSocketAddress addr = NetUtils.createSocketAddr(
       datanodeid.getHost() + ":" + datanodeid.getIpcPort());
     if (ClientDatanodeProtocol.LOG.isDebugEnabled()) {
       ClientDatanodeProtocol.LOG.info("ClientDatanodeProtocol addr=" + addr);
     }
+    UserGroupInformation ticket = UserGroupInformation
+        .createRemoteUser(locatedBlock.getBlock().toString());
+    ticket.addToken(locatedBlock.getBlockToken());
     return (ClientDatanodeProtocol)RPC.getProxy(ClientDatanodeProtocol.class,
-        ClientDatanodeProtocol.versionID, addr, conf);
+        ClientDatanodeProtocol.versionID, addr, ticket, conf, NetUtils
+        .getDefaultSocketFactory(conf));
   }
         
   /**
@@ -946,7 +952,7 @@ public class DFSClient implements FSCons
           }
           // get block MD5
           DataTransferProtocol.Sender.opBlockChecksum(out, block.getBlockId(),
-              block.getGenerationStamp(), lb.getAccessToken());
+              block.getGenerationStamp(), lb.getBlockToken());
 
           final DataTransferProtocol.Status reply = DataTransferProtocol.Status.read(in);
           if (reply != SUCCESS) {

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSInputStream.java?rev=948634&r1=948633&r2=948634&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSInputStream.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSInputStream.java Wed May 26 23:31:56 2010
@@ -34,10 +34,11 @@ import org.apache.hadoop.hdfs.protocol.C
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
-import org.apache.hadoop.hdfs.security.BlockAccessToken;
-import org.apache.hadoop.hdfs.security.InvalidAccessTokenException;
+import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.StringUtils;
 
 /****************************************************************
@@ -141,7 +142,7 @@ class DFSInputStream extends FSInputStre
     for(DatanodeInfo datanode : locatedblock.getLocations()) {
       try {
         final ClientDatanodeProtocol cdp = DFSClient.createClientDatanodeProtocolProxy(
-            datanode, dfsClient.conf);
+            datanode, dfsClient.conf, locatedblock);
         final long n = cdp.getReplicaVisibleLength(locatedblock.getBlock());
         if (n >= 0) {
           return n;
@@ -353,7 +354,7 @@ class DFSInputStream extends FSInputStre
         NetUtils.connect(s, targetAddr, dfsClient.socketTimeout);
         s.setSoTimeout(dfsClient.socketTimeout);
         Block blk = targetBlock.getBlock();
-        BlockAccessToken accessToken = targetBlock.getAccessToken();
+        Token<BlockTokenIdentifier> accessToken = targetBlock.getBlockToken();
         
         blockReader = BlockReader.newBlockReader(s, src, blk.getBlockId(), 
             accessToken, 
@@ -362,7 +363,7 @@ class DFSInputStream extends FSInputStre
             buffersize, verifyChecksum, dfsClient.clientName);
         return chosenNode;
       } catch (IOException ex) {
-        if (ex instanceof InvalidAccessTokenException && refetchToken > 0) {
+        if (ex instanceof InvalidBlockTokenException && refetchToken > 0) {
           DFSClient.LOG.info("Will fetch a new access token and retry, " 
               + "access token was invalid when connecting to " + targetAddr
               + " : " + ex);
@@ -593,13 +594,13 @@ class DFSInputStream extends FSInputStre
         dn = dfsClient.socketFactory.createSocket();
         NetUtils.connect(dn, targetAddr, dfsClient.socketTimeout);
         dn.setSoTimeout(dfsClient.socketTimeout);
-        BlockAccessToken accessToken = block.getAccessToken();
+        Token<BlockTokenIdentifier> blockToken = block.getBlockToken();
             
         int len = (int) (end - start + 1);
             
         reader = BlockReader.newBlockReader(dn, src, 
                                             block.getBlock().getBlockId(),
-                                            accessToken,
+                                            blockToken,
                                             block.getBlock().getGenerationStamp(),
                                             start, len, buffersize, 
                                             verifyChecksum, dfsClient.clientName);
@@ -615,7 +616,7 @@ class DFSInputStream extends FSInputStre
                  e.getPos() + " from " + chosenNode.getName());
         dfsClient.reportChecksumFailure(src, block.getBlock(), chosenNode);
       } catch (IOException e) {
-        if (e instanceof InvalidAccessTokenException && refetchToken > 0) {
+        if (e instanceof InvalidBlockTokenException && refetchToken > 0) {
           DFSClient.LOG.info("Will get a new access token and retry, "
               + "access token was invalid when connecting to " + targetAddr
               + " : " + e);

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java?rev=948634&r1=948633&r2=948634&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java Wed May 26 23:31:56 2010
@@ -54,8 +54,6 @@ import org.apache.hadoop.hdfs.protocol.N
 import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PipelineAck;
-import org.apache.hadoop.hdfs.security.BlockAccessToken;
-import org.apache.hadoop.hdfs.security.InvalidAccessTokenException;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
@@ -65,11 +63,14 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.PureJavaCrc32;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 
 /****************************************************************
  * DFSOutputStream creates files from a stream of bytes.
@@ -272,7 +273,7 @@ class DFSOutputStream extends FSOutputSu
   class DataStreamer extends Daemon {
     private volatile boolean streamerClosed = false;
     private Block block; // its length is number of bytes acked
-    private BlockAccessToken accessToken;
+    private Token<BlockTokenIdentifier> accessToken;
     private DataOutputStream blockStream;
     private DataInputStream blockReplyStream;
     private ResponseProcessor response = null;
@@ -302,7 +303,7 @@ class DFSOutputStream extends FSOutputSu
       stage = BlockConstructionStage.PIPELINE_SETUP_APPEND;
       block = lastBlock.getBlock();
       bytesSent = block.getNumBytes();
-      accessToken = lastBlock.getAccessToken();
+      accessToken = lastBlock.getBlockToken();
       long usedInLastBlock = stat.getLen() % blockSize;
       int freeInLastBlock = (int)(blockSize - usedInLastBlock);
 
@@ -773,7 +774,7 @@ class DFSOutputStream extends FSOutputSu
         // get a new generation stamp and an access token
         LocatedBlock lb = dfsClient.namenode.updateBlockForPipeline(block, dfsClient.clientName);
         newGS = lb.getBlock().getGenerationStamp();
-        accessToken = lb.getAccessToken();
+        accessToken = lb.getBlockToken();
         
         // set up the pipeline again with the remaining nodes
         success = createBlockOutputStream(nodes, newGS, isRecovery);
@@ -813,7 +814,7 @@ class DFSOutputStream extends FSOutputSu
         lb = locateFollowingBlock(startTime, w.length > 0 ? w : null);
         block = lb.getBlock();
         block.setNumBytes(0);
-        accessToken = lb.getAccessToken();
+        accessToken = lb.getBlockToken();
         nodes = lb.getLocations();
 
         //
@@ -884,7 +885,7 @@ class DFSOutputStream extends FSOutputSu
         firstBadLink = Text.readString(blockReplyStream);
         if (pipelineStatus != SUCCESS) {
           if (pipelineStatus == ERROR_ACCESS_TOKEN) {
-            throw new InvalidAccessTokenException(
+            throw new InvalidBlockTokenException(
                 "Got access token error for connect ack with firstBadLink as "
                     + firstBadLink);
           } else {
@@ -977,7 +978,7 @@ class DFSOutputStream extends FSOutputSu
       return nodes;
     }
 
-    BlockAccessToken getAccessToken() {
+    Token<BlockTokenIdentifier> getBlockToken() {
       return accessToken;
     }
 
@@ -1455,8 +1456,8 @@ class DFSOutputStream extends FSOutputSu
   /**
    * Returns the access token currently used by streamer, for testing only
    */
-  BlockAccessToken getAccessToken() {
-    return streamer.getAccessToken();
+  Token<BlockTokenIdentifier> getBlockToken() {
+    return streamer.getBlockToken();
   }
 
 }

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java?rev=948634&r1=948633&r2=948634&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java Wed May 26 23:31:56 2010
@@ -21,10 +21,13 @@ import java.io.IOException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenSelector;
 import org.apache.hadoop.ipc.VersionedProtocol;
+import org.apache.hadoop.security.token.TokenInfo;
 
 /** An client-datanode protocol for block recovery
  */
+@TokenInfo(BlockTokenSelector.class)
 public interface ClientDatanodeProtocol extends VersionedProtocol {
   public static final Log LOG = LogFactory.getLog(ClientDatanodeProtocol.class);
 

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java?rev=948634&r1=948633&r2=948634&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java Wed May 26 23:31:56 2010
@@ -24,10 +24,11 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 
-import org.apache.hadoop.hdfs.security.BlockAccessToken;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.security.token.Token;
 
 /**
  * Transfer data to/from datanode using a streaming protocol.
@@ -221,9 +222,9 @@ public interface DataTransferProtocol {
     }
 
     /** Send OP_READ_BLOCK */
-    public static void opReadBlock(DataOutputStream out,
-        long blockId, long blockGs, long blockOffset, long blockLen,
-        String clientName, BlockAccessToken accessToken) throws IOException {
+    public static void opReadBlock(DataOutputStream out, long blockId,
+        long blockGs, long blockOffset, long blockLen, String clientName,
+        Token<BlockTokenIdentifier> blockToken) throws IOException {
       op(out, Op.READ_BLOCK);
 
       out.writeLong(blockId);
@@ -231,16 +232,16 @@ public interface DataTransferProtocol {
       out.writeLong(blockOffset);
       out.writeLong(blockLen);
       Text.writeString(out, clientName);
-      accessToken.write(out);
+      blockToken.write(out);
       out.flush();
     }
     
     /** Send OP_WRITE_BLOCK */
-    public static void opWriteBlock(DataOutputStream out,
-        long blockId, long blockGs, int pipelineSize, 
-        BlockConstructionStage stage, long newGs, long minBytesRcvd,
-        long maxBytesRcvd, String client, DatanodeInfo src, 
-        DatanodeInfo[] targets, BlockAccessToken accesstoken) throws IOException {
+    public static void opWriteBlock(DataOutputStream out, long blockId,
+        long blockGs, int pipelineSize, BlockConstructionStage stage,
+        long newGs, long minBytesRcvd, long maxBytesRcvd, String client,
+        DatanodeInfo src, DatanodeInfo[] targets,
+        Token<BlockTokenIdentifier> blockToken) throws IOException {
       op(out, Op.WRITE_BLOCK);
 
       out.writeLong(blockId);
@@ -261,42 +262,44 @@ public interface DataTransferProtocol {
         targets[i].write(out);
       }
 
-      accesstoken.write(out);
+      blockToken.write(out);
     }
     
     /** Send OP_REPLACE_BLOCK */
     public static void opReplaceBlock(DataOutputStream out,
         long blockId, long blockGs, String storageId, DatanodeInfo src,
-        BlockAccessToken accesstoken) throws IOException {
+        Token<BlockTokenIdentifier> blockToken) throws IOException {
       op(out, Op.REPLACE_BLOCK);
 
       out.writeLong(blockId);
       out.writeLong(blockGs);
       Text.writeString(out, storageId);
       src.write(out);
-      accesstoken.write(out);
+      blockToken.write(out);
       out.flush();
     }
 
     /** Send OP_COPY_BLOCK */
-    public static void opCopyBlock(DataOutputStream out,
-        long blockId, long blockGs, BlockAccessToken accesstoken) throws IOException {
+    public static void opCopyBlock(DataOutputStream out, long blockId,
+        long blockGs, Token<BlockTokenIdentifier> blockToken)
+        throws IOException {
       op(out, Op.COPY_BLOCK);
 
       out.writeLong(blockId);
       out.writeLong(blockGs);
-      accesstoken.write(out);
+      blockToken.write(out);
       out.flush();
     }
 
     /** Send OP_BLOCK_CHECKSUM */
-    public static void opBlockChecksum(DataOutputStream out,
-        long blockId, long blockGs, BlockAccessToken accesstoken) throws IOException {
+    public static void opBlockChecksum(DataOutputStream out, long blockId,
+        long blockGs, Token<BlockTokenIdentifier> blockToken)
+        throws IOException {
       op(out, Op.BLOCK_CHECKSUM);
 
       out.writeLong(blockId);
       out.writeLong(blockGs);
-      accesstoken.write(out);
+      blockToken.write(out);
       out.flush();
     }
   }
@@ -345,18 +348,17 @@ public interface DataTransferProtocol {
       final long offset = in.readLong();
       final long length = in.readLong();
       final String client = Text.readString(in);
-      final BlockAccessToken accesstoken = readAccessToken(in);
+      final Token<BlockTokenIdentifier> blockToken = readBlockToken(in);
 
-      opReadBlock(in, blockId, blockGs, offset, length, client, accesstoken);
+      opReadBlock(in, blockId, blockGs, offset, length, client, blockToken);
     }
 
     /**
-     * Abstract OP_READ_BLOCK method.
-     * Read a block.
+     * Abstract OP_READ_BLOCK method. Read a block.
      */
-    protected abstract void opReadBlock(DataInputStream in,
-        long blockId, long blockGs, long offset, long length,
-        String client, BlockAccessToken accesstoken) throws IOException;
+    protected abstract void opReadBlock(DataInputStream in, long blockId,
+        long blockGs, long offset, long length, String client,
+        Token<BlockTokenIdentifier> blockToken) throws IOException;
     
     /** Receive OP_WRITE_BLOCK */
     private void opWriteBlock(DataInputStream in) throws IOException {
@@ -379,10 +381,10 @@ public interface DataTransferProtocol {
       for (int i = 0; i < targets.length; i++) {
         targets[i] = DatanodeInfo.read(in);
       }
-      final BlockAccessToken accesstoken = readAccessToken(in);
+      final Token<BlockTokenIdentifier> blockToken = readBlockToken(in);
 
       opWriteBlock(in, blockId, blockGs, pipelineSize, stage,
-          newGs, minBytesRcvd, maxBytesRcvd, client, src, targets, accesstoken);
+          newGs, minBytesRcvd, maxBytesRcvd, client, src, targets, blockToken);
     }
 
     /**
@@ -394,7 +396,7 @@ public interface DataTransferProtocol {
         int pipelineSize, BlockConstructionStage stage,
         long newGs, long minBytesRcvd, long maxBytesRcvd,
         String client, DatanodeInfo src, DatanodeInfo[] targets,
-        BlockAccessToken accesstoken) throws IOException;
+        Token<BlockTokenIdentifier> blockToken) throws IOException;
 
     /** Receive OP_REPLACE_BLOCK */
     private void opReplaceBlock(DataInputStream in) throws IOException {
@@ -402,9 +404,9 @@ public interface DataTransferProtocol {
       final long blockGs = in.readLong();
       final String sourceId = Text.readString(in); // read del hint
       final DatanodeInfo src = DatanodeInfo.read(in); // read proxy source
-      final BlockAccessToken accesstoken = readAccessToken(in);
+      final Token<BlockTokenIdentifier> blockToken = readBlockToken(in);
 
-      opReplaceBlock(in, blockId, blockGs, sourceId, src, accesstoken);
+      opReplaceBlock(in, blockId, blockGs, sourceId, src, blockToken);
     }
 
     /**
@@ -413,44 +415,46 @@ public interface DataTransferProtocol {
      */
     protected abstract void opReplaceBlock(DataInputStream in,
         long blockId, long blockGs, String sourceId, DatanodeInfo src,
-        BlockAccessToken accesstoken) throws IOException;
+        Token<BlockTokenIdentifier> blockToken) throws IOException;
 
     /** Receive OP_COPY_BLOCK */
     private void opCopyBlock(DataInputStream in) throws IOException {
       final long blockId = in.readLong();          
       final long blockGs = in.readLong();
-      final BlockAccessToken accesstoken = readAccessToken(in);
+      final Token<BlockTokenIdentifier> blockToken = readBlockToken(in);
 
-      opCopyBlock(in, blockId, blockGs, accesstoken);
+      opCopyBlock(in, blockId, blockGs, blockToken);
     }
 
     /**
-     * Abstract OP_COPY_BLOCK method.
-     * It is used for balancing purpose; send to a proxy source.
+     * Abstract OP_COPY_BLOCK method. It is used for balancing purpose; send to
+     * a proxy source.
      */
-    protected abstract void opCopyBlock(DataInputStream in,
-        long blockId, long blockGs, BlockAccessToken accesstoken) throws IOException;
+    protected abstract void opCopyBlock(DataInputStream in, long blockId,
+        long blockGs, Token<BlockTokenIdentifier> blockToken)
+        throws IOException;
 
     /** Receive OP_BLOCK_CHECKSUM */
     private void opBlockChecksum(DataInputStream in) throws IOException {
       final long blockId = in.readLong();          
       final long blockGs = in.readLong();
-      final BlockAccessToken accesstoken = readAccessToken(in);
+      final Token<BlockTokenIdentifier> blockToken = readBlockToken(in);
 
-      opBlockChecksum(in, blockId, blockGs, accesstoken);
+      opBlockChecksum(in, blockId, blockGs, blockToken);
     }
 
     /**
      * Abstract OP_BLOCK_CHECKSUM method.
      * Get the checksum of a block 
      */
-    protected abstract void opBlockChecksum(DataInputStream in,
-        long blockId, long blockGs, BlockAccessToken accesstoken) throws IOException;
+    protected abstract void opBlockChecksum(DataInputStream in, long blockId,
+        long blockGs, Token<BlockTokenIdentifier> blockToken)
+        throws IOException;
 
     /** Read an AccessToken */
-    static private BlockAccessToken readAccessToken(DataInputStream in
+    static private Token<BlockTokenIdentifier> readBlockToken(DataInputStream in
         ) throws IOException {
-      final BlockAccessToken t = new BlockAccessToken();
+      final Token<BlockTokenIdentifier> t = new Token<BlockTokenIdentifier>();
       t.readFields(in);
       return t; 
     }

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java?rev=948634&r1=948633&r2=948634&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java Wed May 26 23:31:56 2010
@@ -17,8 +17,9 @@
  */
 package org.apache.hadoop.hdfs.protocol;
 
-import org.apache.hadoop.hdfs.security.BlockAccessToken;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.io.*;
+import org.apache.hadoop.security.token.Token;
 
 import java.io.*;
 
@@ -44,7 +45,7 @@ public class LocatedBlock implements Wri
   // else false. If block has few corrupt replicas, they are filtered and 
   // their locations are not part of this object
   private boolean corrupt;
-  private BlockAccessToken accessToken = new BlockAccessToken();
+  private Token<BlockTokenIdentifier> blockToken = new Token<BlockTokenIdentifier>();
 
   /**
    */
@@ -78,12 +79,12 @@ public class LocatedBlock implements Wri
     }
   }
 
-  public BlockAccessToken getAccessToken() {
-    return accessToken;
+  public Token<BlockTokenIdentifier> getBlockToken() {
+    return blockToken;
   }
 
-  public void setAccessToken(BlockAccessToken token) {
-    this.accessToken = token;
+  public void setBlockToken(Token<BlockTokenIdentifier> token) {
+    this.blockToken = token;
   }
 
   /**
@@ -122,7 +123,7 @@ public class LocatedBlock implements Wri
   // Writable
   ///////////////////////////////////////////
   public void write(DataOutput out) throws IOException {
-    accessToken.write(out);
+    blockToken.write(out);
     out.writeBoolean(corrupt);
     out.writeLong(offset);
     b.write(out);
@@ -133,7 +134,7 @@ public class LocatedBlock implements Wri
   }
 
   public void readFields(DataInput in) throws IOException {
-    accessToken.readFields(in);
+    blockToken.readFields(in);
     this.corrupt = in.readBoolean();
     offset = in.readLong();
     this.b = new Block();

Added: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/security/token/block/BlockKey.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/security/token/block/BlockKey.java?rev=948634&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/security/token/block/BlockKey.java (added)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/security/token/block/BlockKey.java Wed May 26 23:31:56 2010
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.security.token.block;
+
+import javax.crypto.SecretKey;
+
+import org.apache.hadoop.security.token.delegation.DelegationKey;
+
+/**
+ * Key used for generating and verifying block tokens
+ */
+public class BlockKey extends DelegationKey {
+
+  public BlockKey() {
+    super();
+  }
+
+  public BlockKey(int keyId, long expiryDate, SecretKey key) {
+    super(keyId, expiryDate, key);
+  }
+}
\ No newline at end of file

Added: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java?rev=948634&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java (added)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java Wed May 26 23:31:56 2010
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.security.token.block;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.EnumSet;
+
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.TokenIdentifier;
+
+public class BlockTokenIdentifier extends TokenIdentifier {
+  static final Text KIND_NAME = new Text("HDFS_BLOCK_TOKEN");
+
+  private long expiryDate;
+  private int keyId;
+  private String userId;
+  private long blockId;
+  private EnumSet<AccessMode> modes;
+
+  public BlockTokenIdentifier() {
+    this(null, 0, EnumSet.noneOf(AccessMode.class));
+  }
+
+  public BlockTokenIdentifier(String userId, long blockId,
+      EnumSet<AccessMode> modes) {
+    this.userId = userId;
+    this.blockId = blockId;
+    this.modes = modes == null ? EnumSet.noneOf(AccessMode.class) : modes;
+  }
+
+  @Override
+  public Text getKind() {
+    return KIND_NAME;
+  }
+
+  @Override
+  public UserGroupInformation getUser() {
+    if (userId == null || "".equals(userId)) {
+      return UserGroupInformation.createRemoteUser(Long.toString(blockId));
+    }
+    return UserGroupInformation.createRemoteUser(userId);
+  }
+
+  public long getExpiryDate() {
+    return expiryDate;
+  }
+
+  public void setExpiryDate(long expiryDate) {
+    this.expiryDate = expiryDate;
+  }
+
+  public int getKeyId() {
+    return this.keyId;
+  }
+
+  public void setKeyId(int keyId) {
+    this.keyId = keyId;
+  }
+
+  public String getUserId() {
+    return userId;
+  }
+
+  public long getBlockId() {
+    return blockId;
+  }
+
+  public EnumSet<AccessMode> getAccessModes() {
+    return modes;
+  }
+
+  public String toString() {
+    return "block_token_identifier (expiryDate=" + this.getExpiryDate()
+        + ", keyId=" + this.getKeyId() + ", userId=" + this.getUserId()
+        + ", blockId=" + this.getBlockId() + ", access modes="
+        + this.getAccessModes() + ")";
+  }
+
+  static boolean isEqual(Object a, Object b) {
+    return a == null ? b == null : a.equals(b);
+  }
+
+  /** {@inheritDoc} */
+  public boolean equals(Object obj) {
+    if (obj == this) {
+      return true;
+    }
+    if (obj instanceof BlockTokenIdentifier) {
+      BlockTokenIdentifier that = (BlockTokenIdentifier) obj;
+      return this.expiryDate == that.expiryDate && this.keyId == that.keyId
+          && isEqual(this.userId, that.userId) && this.blockId == that.blockId
+          && isEqual(this.modes, that.modes);
+    }
+    return false;
+  }
+
+  /** {@inheritDoc} */
+  public int hashCode() {
+    return (int) expiryDate ^ keyId ^ (int) blockId ^ modes.hashCode()
+        ^ (userId == null ? 0 : userId.hashCode());
+  }
+
+  public void readFields(DataInput in) throws IOException {
+    expiryDate = WritableUtils.readVLong(in);
+    keyId = WritableUtils.readVInt(in);
+    userId = WritableUtils.readString(in);
+    blockId = WritableUtils.readVLong(in);
+    int length = WritableUtils.readVInt(in);
+    for (int i = 0; i < length; i++) {
+      modes.add(WritableUtils.readEnum(in, AccessMode.class));
+    }
+  }
+
+  public void write(DataOutput out) throws IOException {
+    WritableUtils.writeVLong(out, expiryDate);
+    WritableUtils.writeVInt(out, keyId);
+    WritableUtils.writeString(out, userId);
+    WritableUtils.writeVLong(out, blockId);
+    WritableUtils.writeVInt(out, modes.size());
+    for (AccessMode aMode : modes) {
+      WritableUtils.writeEnum(out, aMode);
+    }
+  }
+}

Added: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java?rev=948634&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java (added)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java Wed May 26 23:31:56 2010
@@ -0,0 +1,318 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.security.token.block;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.security.SecureRandom;
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.SecretManager;
+import org.apache.hadoop.security.token.Token;
+
+/**
+ * BlockTokenSecretManager can be instantiated in 2 modes, master mode and slave
+ * mode. Master can generate new block keys and export block keys to slaves,
+ * while slaves can only import and use block keys received from master. Both
+ * master and slave can generate and verify block tokens. Typically, master mode
+ * is used by NN and slave mode is used by DN.
+ */
+public class BlockTokenSecretManager extends
+    SecretManager<BlockTokenIdentifier> {
+  public static final Log LOG = LogFactory
+      .getLog(BlockTokenSecretManager.class);
+  public static final Token<BlockTokenIdentifier> DUMMY_TOKEN = new Token<BlockTokenIdentifier>();
+
+  private final boolean isMaster;
+  /*
+   * keyUpdateInterval is the interval that NN updates its block keys. It should
+   * be set long enough so that all live DN's and Balancer should have sync'ed
+   * their block keys with NN at least once during each interval.
+   */
+  private final long keyUpdateInterval;
+  private volatile long tokenLifetime;
+  private int serialNo = new SecureRandom().nextInt();
+  private BlockKey currentKey;
+  private BlockKey nextKey;
+  private Map<Integer, BlockKey> allKeys;
+
+  public static enum AccessMode {
+    READ, WRITE, COPY, REPLACE
+  };
+
+  /**
+   * Constructor
+   * 
+   * @param isMaster
+   * @param keyUpdateInterval
+   * @param tokenLifetime
+   * @throws IOException
+   */
+  public BlockTokenSecretManager(boolean isMaster, long keyUpdateInterval,
+      long tokenLifetime) throws IOException {
+    this.isMaster = isMaster;
+    this.keyUpdateInterval = keyUpdateInterval;
+    this.tokenLifetime = tokenLifetime;
+    this.allKeys = new HashMap<Integer, BlockKey>();
+    generateKeys();
+  }
+
+  /** Initialize block keys */
+  private synchronized void generateKeys() {
+    if (!isMaster)
+      return;
+    /*
+     * Need to set estimated expiry dates for currentKey and nextKey so that if
+     * NN crashes, DN can still expire those keys. NN will stop using the newly
+     * generated currentKey after the first keyUpdateInterval, however it may
+     * still be used by DN and Balancer to generate new tokens before they get a
+     * chance to sync their keys with NN. Since we require keyUpdInterval to be
+     * long enough so that all live DN's and Balancer will sync their keys with
+     * NN at least once during the period, the estimated expiry date for
+     * currentKey is set to now() + 2 * keyUpdateInterval + tokenLifetime.
+     * Similarly, the estimated expiry date for nextKey is one keyUpdateInterval
+     * more.
+     */
+    serialNo++;
+    currentKey = new BlockKey(serialNo, System.currentTimeMillis() + 2
+        * keyUpdateInterval + tokenLifetime, generateSecret());
+    serialNo++;
+    nextKey = new BlockKey(serialNo, System.currentTimeMillis() + 3
+        * keyUpdateInterval + tokenLifetime, generateSecret());
+    allKeys.put(currentKey.getKeyId(), currentKey);
+    allKeys.put(nextKey.getKeyId(), nextKey);
+  }
+
+  /** Export block keys, only to be used in master mode */
+  public synchronized ExportedBlockKeys exportKeys() {
+    if (!isMaster)
+      return null;
+    if (LOG.isDebugEnabled())
+      LOG.debug("Exporting access keys");
+    return new ExportedBlockKeys(true, keyUpdateInterval, tokenLifetime,
+        currentKey, allKeys.values().toArray(new BlockKey[0]));
+  }
+
+  private synchronized void removeExpiredKeys() {
+    long now = System.currentTimeMillis();
+    for (Iterator<Map.Entry<Integer, BlockKey>> it = allKeys.entrySet()
+        .iterator(); it.hasNext();) {
+      Map.Entry<Integer, BlockKey> e = it.next();
+      if (e.getValue().getExpiryDate() < now) {
+        it.remove();
+      }
+    }
+  }
+
+  /**
+   * Set block keys, only to be used in slave mode
+   */
+  public synchronized void setKeys(ExportedBlockKeys exportedKeys)
+      throws IOException {
+    if (isMaster || exportedKeys == null)
+      return;
+    LOG.info("Setting block keys");
+    removeExpiredKeys();
+    this.currentKey = exportedKeys.getCurrentKey();
+    BlockKey[] receivedKeys = exportedKeys.getAllKeys();
+    for (int i = 0; i < receivedKeys.length; i++) {
+      if (receivedKeys[i] == null)
+        continue;
+      this.allKeys.put(receivedKeys[i].getKeyId(), receivedKeys[i]);
+    }
+  }
+
+  /**
+   * Update block keys, only to be used in master mode
+   */
+  public synchronized void updateKeys() throws IOException {
+    if (!isMaster)
+      return;
+    LOG.info("Updating block keys");
+    removeExpiredKeys();
+    // set final expiry date of retiring currentKey
+    allKeys.put(currentKey.getKeyId(), new BlockKey(currentKey.getKeyId(),
+        System.currentTimeMillis() + keyUpdateInterval + tokenLifetime,
+        currentKey.getKey()));
+    // update the estimated expiry date of new currentKey
+    currentKey = new BlockKey(nextKey.getKeyId(), System.currentTimeMillis()
+        + 2 * keyUpdateInterval + tokenLifetime, nextKey.getKey());
+    allKeys.put(currentKey.getKeyId(), currentKey);
+    // generate a new nextKey
+    serialNo++;
+    nextKey = new BlockKey(serialNo, System.currentTimeMillis() + 3
+        * keyUpdateInterval + tokenLifetime, generateSecret());
+    allKeys.put(nextKey.getKeyId(), nextKey);
+  }
+
+  /** Generate an block token for current user */
+  public Token<BlockTokenIdentifier> generateToken(Block block,
+      EnumSet<AccessMode> modes) throws IOException {
+    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+    String userID = (ugi == null ? null : ugi.getShortUserName());
+    return generateToken(userID, block, modes);
+  }
+
+  /** Generate a block token for a specified user */
+  public Token<BlockTokenIdentifier> generateToken(String userId, Block block,
+      EnumSet<AccessMode> modes) throws IOException {
+    BlockTokenIdentifier id = new BlockTokenIdentifier(userId, block
+        .getBlockId(), modes);
+    return new Token<BlockTokenIdentifier>(id, this);
+  }
+
+  /**
+   * Check if access should be allowed. userID is not checked if null. This
+   * method doesn't check if token password is correct. It should be used only
+   * when token password has already been verified (e.g., in the RPC layer).
+   */
+  public void checkAccess(BlockTokenIdentifier id, String userId, Block block,
+      AccessMode mode) throws InvalidToken {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Checking access for user=" + userId + ", block=" + block
+          + ", access mode=" + mode + " using " + id.toString());
+    }
+    if (userId != null && !userId.equals(id.getUserId())) {
+      throw new InvalidToken("Block token with " + id.toString()
+          + " doesn't belong to user " + userId);
+    }
+    if (id.getBlockId() != block.getBlockId()) {
+      throw new InvalidToken("Block token with " + id.toString()
+          + " doesn't apply to block " + block);
+    }
+    if (isExpired(id.getExpiryDate())) {
+      throw new InvalidToken("Block token with " + id.toString()
+          + " is expired.");
+    }
+    if (!id.getAccessModes().contains(mode)) {
+      throw new InvalidToken("Block token with " + id.toString()
+          + " doesn't have " + mode + " permission");
+    }
+  }
+
+  /** Check if access should be allowed. userID is not checked if null */
+  public void checkAccess(Token<BlockTokenIdentifier> token, String userId,
+      Block block, AccessMode mode) throws InvalidToken {
+    BlockTokenIdentifier id = new BlockTokenIdentifier();
+    try {
+      id.readFields(new DataInputStream(new ByteArrayInputStream(token
+          .getIdentifier())));
+    } catch (IOException e) {
+      throw new InvalidToken(
+          "Unable to de-serialize block token identifier for user=" + userId
+              + ", block=" + block + ", access mode=" + mode);
+    }
+    checkAccess(id, userId, block, mode);
+    if (!Arrays.equals(retrievePassword(id), token.getPassword())) {
+      throw new InvalidToken("Block token with " + id.toString()
+          + " doesn't have the correct token password");
+    }
+  }
+
+  private static boolean isExpired(long expiryDate) {
+    return System.currentTimeMillis() > expiryDate;
+  }
+
+  /**
+   * check if a token is expired. for unit test only. return true when token is
+   * expired, false otherwise
+   */
+  static boolean isTokenExpired(Token<BlockTokenIdentifier> token)
+      throws IOException {
+    ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
+    DataInputStream in = new DataInputStream(buf);
+    long expiryDate = WritableUtils.readVLong(in);
+    return isExpired(expiryDate);
+  }
+
+  /** set token lifetime. */
+  public void setTokenLifetime(long tokenLifetime) {
+    this.tokenLifetime = tokenLifetime;
+  }
+
+  /**
+   * Create an empty block token identifier
+   * 
+   * @return a newly created empty block token identifier
+   */
+  @Override
+  public BlockTokenIdentifier createIdentifier() {
+    return new BlockTokenIdentifier();
+  }
+
+  /**
+   * Create a new password/secret for the given block token identifier.
+   * 
+   * @param identifier
+   *          the block token identifier
+   * @return token password/secret
+   */
+  @Override
+  protected byte[] createPassword(BlockTokenIdentifier identifier) {
+    BlockKey key = null;
+    synchronized (this) {
+      key = currentKey;
+    }
+    if (key == null)
+      throw new IllegalStateException("currentKey hasn't been initialized.");
+    identifier.setExpiryDate(System.currentTimeMillis() + tokenLifetime);
+    identifier.setKeyId(key.getKeyId());
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Generating block token for " + identifier.toString());
+    }
+    return createPassword(identifier.getBytes(), key.getKey());
+  }
+
+  /**
+   * Look up the token password/secret for the given block token identifier.
+   * 
+   * @param identifier
+   *          the block token identifier to look up
+   * @return token password/secret as byte[]
+   * @throws InvalidToken
+   */
+  @Override
+  public byte[] retrievePassword(BlockTokenIdentifier identifier)
+      throws InvalidToken {
+    if (isExpired(identifier.getExpiryDate())) {
+      throw new InvalidToken("Block token with " + identifier.toString()
+          + " is expired.");
+    }
+    BlockKey key = null;
+    synchronized (this) {
+      key = allKeys.get(identifier.getKeyId());
+    }
+    if (key == null) {
+      throw new InvalidToken("Can't re-compute password for "
+          + identifier.toString() + ", since the required block key (keyID="
+          + identifier.getKeyId() + ") doesn't exist.");
+    }
+    return createPassword(identifier.getBytes(), key.getKey());
+  }
+}

Added: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSelector.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSelector.java?rev=948634&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSelector.java (added)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSelector.java Wed May 26 23:31:56 2010
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.security.token.block;
+
+import java.util.Collection;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.token.TokenSelector;
+
+/**
+ * A block token selector for HDFS
+ */
+public class BlockTokenSelector implements TokenSelector<BlockTokenIdentifier> {
+
+  @SuppressWarnings("unchecked")
+  public Token<BlockTokenIdentifier> selectToken(Text service,
+      Collection<Token<? extends TokenIdentifier>> tokens) {
+    if (service == null) {
+      return null;
+    }
+    for (Token<? extends TokenIdentifier> token : tokens) {
+      if (BlockTokenIdentifier.KIND_NAME.equals(token.getKind())) {
+        return (Token<BlockTokenIdentifier>) token;
+      }
+    }
+    return null;
+  }
+}

Added: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/security/token/block/ExportedBlockKeys.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/security/token/block/ExportedBlockKeys.java?rev=948634&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/security/token/block/ExportedBlockKeys.java (added)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/security/token/block/ExportedBlockKeys.java Wed May 26 23:31:56 2010
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.security.token.block;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableFactories;
+import org.apache.hadoop.io.WritableFactory;
+
+/**
+ * Object for passing block keys
+ */
+public class ExportedBlockKeys implements Writable {
+  public static final ExportedBlockKeys DUMMY_KEYS = new ExportedBlockKeys();
+  private boolean isBlockTokenEnabled;
+  private long keyUpdateInterval;
+  private long tokenLifetime;
+  private BlockKey currentKey;
+  private BlockKey[] allKeys;
+
+  public ExportedBlockKeys() {
+    this(false, 0, 0, new BlockKey(), new BlockKey[0]);
+  }
+
+  ExportedBlockKeys(boolean isBlockTokenEnabled, long keyUpdateInterval,
+      long tokenLifetime, BlockKey currentKey, BlockKey[] allKeys) {
+    this.isBlockTokenEnabled = isBlockTokenEnabled;
+    this.keyUpdateInterval = keyUpdateInterval;
+    this.tokenLifetime = tokenLifetime;
+    this.currentKey = currentKey == null ? new BlockKey() : currentKey;
+    this.allKeys = allKeys == null ? new BlockKey[0] : allKeys;
+  }
+
+  public boolean isBlockTokenEnabled() {
+    return isBlockTokenEnabled;
+  }
+
+  public long getKeyUpdateInterval() {
+    return keyUpdateInterval;
+  }
+
+  public long getTokenLifetime() {
+    return tokenLifetime;
+  }
+
+  public BlockKey getCurrentKey() {
+    return currentKey;
+  }
+
+  public BlockKey[] getAllKeys() {
+    return allKeys;
+  }
+  
+  // ///////////////////////////////////////////////
+  // Writable
+  // ///////////////////////////////////////////////
+  static { // register a ctor
+    WritableFactories.setFactory(ExportedBlockKeys.class,
+        new WritableFactory() {
+          public Writable newInstance() {
+            return new ExportedBlockKeys();
+          }
+        });
+  }
+
+  /**
+   */
+  public void write(DataOutput out) throws IOException {
+    out.writeBoolean(isBlockTokenEnabled);
+    out.writeLong(keyUpdateInterval);
+    out.writeLong(tokenLifetime);
+    currentKey.write(out);
+    out.writeInt(allKeys.length);
+    for (int i = 0; i < allKeys.length; i++) {
+      allKeys[i].write(out);
+    }
+  }
+
+  /**
+   */
+  public void readFields(DataInput in) throws IOException {
+    isBlockTokenEnabled = in.readBoolean();
+    keyUpdateInterval = in.readLong();
+    tokenLifetime = in.readLong();
+    currentKey.readFields(in);
+    this.allKeys = new BlockKey[in.readInt()];
+    for (int i = 0; i < allKeys.length; i++) {
+      allKeys[i] = new BlockKey();
+      allKeys[i].readFields(in);
+    }
+  }
+
+}
\ No newline at end of file

Added: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/security/token/block/InvalidBlockTokenException.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/security/token/block/InvalidBlockTokenException.java?rev=948634&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/security/token/block/InvalidBlockTokenException.java (added)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/security/token/block/InvalidBlockTokenException.java Wed May 26 23:31:56 2010
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.security.token.block;
+
+import java.io.IOException;
+
+/**
+ * Access token verification failed.
+ */
+public class InvalidBlockTokenException extends IOException {
+  private static final long serialVersionUID = 168L;
+
+  public InvalidBlockTokenException() {
+    super();
+  }
+
+  public InvalidBlockTokenException(String msg) {
+    super(msg);
+  }
+}

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=948634&r1=948633&r2=948634&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java Wed May 26 23:31:56 2010
@@ -61,9 +61,9 @@ import org.apache.hadoop.hdfs.protocol.D
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
-import org.apache.hadoop.hdfs.security.BlockAccessToken;
-import org.apache.hadoop.hdfs.security.AccessTokenHandler;
-import org.apache.hadoop.hdfs.security.ExportedAccessKeys;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
+import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.Util;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
@@ -81,6 +81,7 @@ import org.apache.hadoop.ipc.RemoteExcep
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Tool;
@@ -199,10 +200,10 @@ public class Balancer implements Tool {
   private NamenodeProtocol namenode;
   private ClientProtocol client;
   private FileSystem fs;
-  private boolean isAccessTokenEnabled;
+  private boolean isBlockTokenEnabled;
   private boolean shouldRun;
   private long keyUpdaterInterval;
-  private AccessTokenHandler accessTokenHandler;
+  private BlockTokenSecretManager blockTokenSecretManager;
   private Daemon keyupdaterthread = null; // AccessKeyUpdater thread
   private final static Random rnd = new Random();
   
@@ -367,11 +368,11 @@ public class Balancer implements Tool {
     
     /* Send a block replace request to the output stream*/
     private void sendRequest(DataOutputStream out) throws IOException {
-      BlockAccessToken accessToken = BlockAccessToken.DUMMY_TOKEN;
-      if (isAccessTokenEnabled) {
-        accessToken = accessTokenHandler.generateToken(null, block.getBlock()
-            .getBlockId(), EnumSet.of(AccessTokenHandler.AccessMode.REPLACE,
-            AccessTokenHandler.AccessMode.COPY));
+      Token<BlockTokenIdentifier> accessToken = BlockTokenSecretManager.DUMMY_TOKEN;
+      if (isBlockTokenEnabled) {
+        accessToken = blockTokenSecretManager.generateToken(null, block
+            .getBlock(), EnumSet.of(BlockTokenSecretManager.AccessMode.REPLACE,
+            BlockTokenSecretManager.AccessMode.COPY));
       }
       DataTransferProtocol.Sender.opReplaceBlock(out,
           block.getBlock().getBlockId(), block.getBlock().getGenerationStamp(),
@@ -860,25 +861,25 @@ public class Balancer implements Tool {
     this.namenode = createNamenode(conf);
     this.client = DFSClient.createNamenode(conf);
     this.fs = FileSystem.get(conf);
-    ExportedAccessKeys keys = namenode.getAccessKeys();
-    this.isAccessTokenEnabled = keys.isAccessTokenEnabled();
-    if (isAccessTokenEnabled) {
-      long accessKeyUpdateInterval = keys.getKeyUpdateInterval();
-      long accessTokenLifetime = keys.getTokenLifetime();
-      LOG.info("Access token params received from NN: keyUpdateInterval="
-          + accessKeyUpdateInterval / (60 * 1000) + " min(s), tokenLifetime="
-          + accessTokenLifetime / (60 * 1000) + " min(s)");
-      this.accessTokenHandler = new AccessTokenHandler(false,
-          accessKeyUpdateInterval, accessTokenLifetime);
-      this.accessTokenHandler.setKeys(keys);
+    ExportedBlockKeys keys = namenode.getBlockKeys();
+    this.isBlockTokenEnabled = keys.isBlockTokenEnabled();
+    if (isBlockTokenEnabled) {
+      long blockKeyUpdateInterval = keys.getKeyUpdateInterval();
+      long blockTokenLifetime = keys.getTokenLifetime();
+      LOG.info("Block token params received from NN: keyUpdateInterval="
+          + blockKeyUpdateInterval / (60 * 1000) + " min(s), tokenLifetime="
+          + blockTokenLifetime / (60 * 1000) + " min(s)");
+      this.blockTokenSecretManager = new BlockTokenSecretManager(false,
+          blockKeyUpdateInterval, blockTokenLifetime);
+      this.blockTokenSecretManager.setKeys(keys);
       /*
-       * Balancer should sync its access keys with NN more frequently than NN
-       * updates its access keys
+       * Balancer should sync its block keys with NN more frequently than NN
+       * updates its block keys
        */
-      this.keyUpdaterInterval = accessKeyUpdateInterval / 4;
-      LOG.info("Balancer will update its access keys every "
+      this.keyUpdaterInterval = blockKeyUpdateInterval / 4;
+      LOG.info("Balancer will update its block keys every "
           + keyUpdaterInterval / (60 * 1000) + " minute(s)");
-      this.keyupdaterthread = new Daemon(new AccessKeyUpdater());
+      this.keyupdaterthread = new Daemon(new BlockKeyUpdater());
       this.shouldRun = true;
       this.keyupdaterthread.start();
     }
@@ -887,12 +888,12 @@ public class Balancer implements Tool {
   /**
    * Periodically updates access keys.
    */
-  class AccessKeyUpdater implements Runnable {
+  class BlockKeyUpdater implements Runnable {
 
     public void run() {
       while (shouldRun) {
         try {
-          accessTokenHandler.setKeys(namenode.getAccessKeys());
+          blockTokenSecretManager.setKeys(namenode.getBlockKeys());
         } catch (Exception e) {
           LOG.error(StringUtils.stringifyException(e));
         }

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java?rev=948634&r1=948633&r2=948634&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java Wed May 26 23:31:56 2010
@@ -41,7 +41,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.BlockReader;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.security.BlockAccessToken;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.server.datanode.DatanodeJspHelper;
 import org.apache.hadoop.hdfs.server.namenode.DatanodeDescriptor;
@@ -107,13 +107,10 @@ public class JspHelper {
     return chosenNode;
   }
 
-  public static void streamBlockInAscii(InetSocketAddress addr, long blockId, 
-                                 BlockAccessToken accessToken, long genStamp, 
-                                 long blockSize, 
-                                 long offsetIntoBlock, long chunkSizeToView, 
-                                 JspWriter out,
-                                 Configuration conf) 
-    throws IOException {
+  public static void streamBlockInAscii(InetSocketAddress addr, long blockId,
+      Token<BlockTokenIdentifier> blockToken, long genStamp, long blockSize,
+      long offsetIntoBlock, long chunkSizeToView, JspWriter out,
+      Configuration conf) throws IOException {
     if (chunkSizeToView == 0) return;
     Socket s = new Socket();
     s.connect(addr, HdfsConstants.READ_TIMEOUT);
@@ -124,7 +121,7 @@ public class JspHelper {
       // Use the block name for file name. 
       BlockReader blockReader = 
         BlockReader.newBlockReader(s, addr.toString() + ":" + blockId,
-                                             blockId, accessToken, genStamp ,offsetIntoBlock, 
+                                             blockId, blockToken, genStamp ,offsetIntoBlock, 
                                              amtToRead, 
                                              conf.getInt("io.file.buffer.size",
                                                          4096));

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=948634&r1=948633&r2=948634&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Wed May 26 23:31:56 2010
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
+import static org.junit.Assert.assertTrue;
+
 import java.io.BufferedOutputStream;
 import java.io.DataOutputStream;
 import java.io.File;
@@ -31,6 +33,7 @@ import java.net.UnknownHostException;
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
 import java.security.NoSuchAlgorithmException;
+import java.security.PrivilegedExceptionAction;
 import java.security.SecureRandom;
 import java.util.AbstractList;
 import java.util.ArrayList;
@@ -40,6 +43,7 @@ import java.util.EnumSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Random;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
@@ -62,9 +66,9 @@ import org.apache.hadoop.hdfs.protocol.L
 import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
 import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
-import org.apache.hadoop.hdfs.security.BlockAccessToken;
-import org.apache.hadoop.hdfs.security.AccessTokenHandler;
-import org.apache.hadoop.hdfs.security.ExportedAccessKeys;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
+import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
 import org.apache.hadoop.hdfs.server.common.Storage;
@@ -91,6 +95,8 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.http.HttpServer;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.RPC;
@@ -203,9 +209,9 @@ public class DataNode extends Configured
   int socketWriteTimeout = 0;  
   boolean transferToAllowed = true;
   int writePacketSize = 0;
-  boolean isAccessTokenEnabled;
-  AccessTokenHandler accessTokenHandler;
-  boolean isAccessTokenInitialized = false;
+  boolean isBlockTokenEnabled;
+  BlockTokenSecretManager blockTokenSecretManager;
+  boolean isBlockTokenInitialized = false;
   
   public DataBlockScanner blockScanner = null;
   public Daemon blockScannerThread = null;
@@ -416,13 +422,17 @@ public class DataNode extends Configured
       ServiceAuthorizationManager.refresh(conf, new HDFSPolicyProvider());
        }
 
+    // BlockTokenSecretManager is created here, but it shouldn't be
+    // used until it is initialized in register().
+    this.blockTokenSecretManager = new BlockTokenSecretManager(false, 0, 0);
+    
     //init ipc server
     InetSocketAddress ipcAddr = NetUtils.createSocketAddr(
         conf.get("dfs.datanode.ipc.address"));
-    ipcServer = RPC.getServer(DataNode.class, this,
-        ipcAddr.getHostName(), ipcAddr.getPort(), 
-        conf.getInt("dfs.datanode.handler.count", 3), false, conf);
-    ipcServer.start();
+    ipcServer = RPC.getServer(DataNode.class, this, ipcAddr.getHostName(),
+        ipcAddr.getPort(), conf.getInt("dfs.datanode.handler.count", 3), false,
+        conf, blockTokenSecretManager);
+    
     dnRegistration.setIpcPort(ipcServer.getListenerAddress().getPort());
 
     LOG.info("dnRegistration = " + dnRegistration);
@@ -492,14 +502,25 @@ public class DataNode extends Configured
   } 
 
   public static InterDatanodeProtocol createInterDataNodeProtocolProxy(
-      DatanodeID datanodeid, Configuration conf) throws IOException {
-    InetSocketAddress addr = NetUtils.createSocketAddr(
+      DatanodeID datanodeid, final Configuration conf) throws IOException {
+    final InetSocketAddress addr = NetUtils.createSocketAddr(
         datanodeid.getHost() + ":" + datanodeid.getIpcPort());
     if (InterDatanodeProtocol.LOG.isDebugEnabled()) {
       InterDatanodeProtocol.LOG.info("InterDatanodeProtocol addr=" + addr);
     }
-    return (InterDatanodeProtocol)RPC.getProxy(InterDatanodeProtocol.class,
-        InterDatanodeProtocol.versionID, addr, conf);
+    UserGroupInformation loginUgi = UserGroupInformation.getLoginUser();
+    try {
+      return loginUgi
+          .doAs(new PrivilegedExceptionAction<InterDatanodeProtocol>() {
+            public InterDatanodeProtocol run() throws IOException {
+              return (InterDatanodeProtocol) RPC.getProxy(
+                  InterDatanodeProtocol.class, InterDatanodeProtocol.versionID,
+                  addr, conf);
+            }
+          });
+    } catch (InterruptedException ie) {
+      throw new IOException(ie.getMessage());
+    }
   }
 
   public InetSocketAddress getNameNodeAddr() {
@@ -601,25 +622,24 @@ public class DataNode extends Configured
           + ". Expecting " + storage.getStorageID());
     }
     
-    if (!isAccessTokenInitialized) {
+    if (!isBlockTokenInitialized) {
       /* first time registering with NN */
-      ExportedAccessKeys keys = dnRegistration.exportedKeys;
-      this.isAccessTokenEnabled = keys.isAccessTokenEnabled();
-      if (isAccessTokenEnabled) {
-        long accessKeyUpdateInterval = keys.getKeyUpdateInterval();
-        long accessTokenLifetime = keys.getTokenLifetime();
-        LOG.info("Access token params received from NN: keyUpdateInterval="
-            + accessKeyUpdateInterval / (60 * 1000) + " min(s), tokenLifetime="
-            + accessTokenLifetime / (60 * 1000) + " min(s)");
-        this.accessTokenHandler = new AccessTokenHandler(false,
-            accessKeyUpdateInterval, accessTokenLifetime);
-      }
-      isAccessTokenInitialized = true;
-    }
-
-    if (isAccessTokenEnabled) {
-      accessTokenHandler.setKeys(dnRegistration.exportedKeys);
-      dnRegistration.exportedKeys = ExportedAccessKeys.DUMMY_KEYS;
+      ExportedBlockKeys keys = dnRegistration.exportedKeys;
+      this.isBlockTokenEnabled = keys.isBlockTokenEnabled();
+      if (isBlockTokenEnabled) {
+        long blockKeyUpdateInterval = keys.getKeyUpdateInterval();
+        long blockTokenLifetime = keys.getTokenLifetime();
+        LOG.info("Block token params received from NN: keyUpdateInterval="
+            + blockKeyUpdateInterval / (60 * 1000) + " min(s), tokenLifetime="
+            + blockTokenLifetime / (60 * 1000) + " min(s)");
+        blockTokenSecretManager.setTokenLifetime(blockTokenLifetime);
+      }
+      isBlockTokenInitialized = true;
+    }
+
+    if (isBlockTokenEnabled) {
+      blockTokenSecretManager.setKeys(dnRegistration.exportedKeys);
+      dnRegistration.exportedKeys = ExportedBlockKeys.DUMMY_KEYS;
     }
 
     // random short delay - helps scatter the BR from all DNs
@@ -941,8 +961,8 @@ public class DataNode extends Configured
       break;
     case DatanodeProtocol.DNA_ACCESSKEYUPDATE:
       LOG.info("DatanodeCommand action: DNA_ACCESSKEYUPDATE");
-      if (isAccessTokenEnabled) {
-        accessTokenHandler.setKeys(((KeyUpdateCommand) cmd).getExportedKeys());
+      if (isBlockTokenEnabled) {
+        blockTokenSecretManager.setKeys(((KeyUpdateCommand) cmd).getExportedKeys());
       }
       break;
     default:
@@ -1268,10 +1288,10 @@ public class DataNode extends Configured
         //
         // Header info
         //
-        BlockAccessToken accessToken = BlockAccessToken.DUMMY_TOKEN;
-        if (isAccessTokenEnabled) {
-          accessToken = accessTokenHandler.generateToken(null, b.getBlockId(),
-              EnumSet.of(AccessTokenHandler.AccessMode.WRITE));
+        Token<BlockTokenIdentifier> accessToken = BlockTokenSecretManager.DUMMY_TOKEN;
+        if (isBlockTokenEnabled) {
+          accessToken = blockTokenSecretManager.generateToken(null, b,
+          EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE));
         }
         DataTransferProtocol.Sender.opWriteBlock(out,
             b.getBlockId(), b.getGenerationStamp(), 0, 
@@ -1325,6 +1345,7 @@ public class DataNode extends Configured
 
     // start dataXceiveServer
     dataXceiverServer.start();
+    ipcServer.start();
         
     while (shouldRun) {
       try {
@@ -1798,6 +1819,24 @@ public class DataNode extends Configured
   /** {@inheritDoc} */
   @Override // ClientDataNodeProtocol
   public long getReplicaVisibleLength(final Block block) throws IOException {
+    if (isBlockTokenEnabled) {
+      Set<TokenIdentifier> tokenIds = UserGroupInformation.getCurrentUser()
+          .getTokenIdentifiers();
+      if (tokenIds.size() != 1) {
+        throw new IOException("Can't continue with getReplicaVisibleLength() "
+            + "authorization since none or more than one BlockTokenIdentifier "
+            + "is found.");
+      }
+      for (TokenIdentifier tokenId : tokenIds) {
+        BlockTokenIdentifier id = (BlockTokenIdentifier) tokenId;
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Got: " + id.toString());
+        }
+        blockTokenSecretManager.checkAccess(id, null, block,
+            BlockTokenSecretManager.AccessMode.WRITE);
+      }
+    }
+
     return data.getReplicaVisibleLength(block);
   }
 }