You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by at...@apache.org on 2012/07/06 20:59:56 UTC

svn commit: r1358347 - in /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src: main/java/org/apache/hadoop/hdfs/ main/java/org/apache/hadoop/hdfs/security/token/block/ main/java/org/apache/hadoop/hdfs/server/balancer/ main/java/org/apache/hadoop/h...

Author: atm
Date: Fri Jul  6 18:59:55 2012
New Revision: 1358347

URL: http://svn.apache.org/viewvc?rev=1358347&view=rev
Log:
Fix issue with NN/DN re-registration.

Added:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestFailoverWithBlockTokensEnabled.java
Modified:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSClientAdapter.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecovery2.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1358347&r1=1358346&r2=1358347&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java Fri Jul  6 18:59:55 2012
@@ -137,6 +137,7 @@ import org.apache.hadoop.security.token.
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.Progressable;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.net.InetAddresses;
@@ -832,6 +833,16 @@ public class DFSClient implements java.i
   public short getDefaultReplication() {
     return dfsClientConf.defaultReplication;
   }
+  
+  /*
+   * This is just a wrapper around callGetBlockLocations, but non-static so that
+   * we can stub it out for tests.
+   */
+  @VisibleForTesting
+  public LocatedBlocks getLocatedBlocks(String src, long start, long length)
+      throws IOException {
+    return callGetBlockLocations(namenode, src, start, length);
+  }
 
   /**
    * @see ClientProtocol#getBlockLocations(String, long, long)
@@ -879,7 +890,7 @@ public class DFSClient implements java.i
    */
   public BlockLocation[] getBlockLocations(String src, long start, 
     long length) throws IOException, UnresolvedLinkException {
-    LocatedBlocks blocks = callGetBlockLocations(namenode, src, start, length);
+    LocatedBlocks blocks = getLocatedBlocks(src, start, length);
     return DFSUtil.locatedBlocks2Locations(blocks);
   }
   

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java?rev=1358347&r1=1358346&r2=1358347&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java Fri Jul  6 18:59:55 2012
@@ -151,7 +151,7 @@ public class DFSInputStream extends FSIn
   }
 
   private long fetchLocatedBlocksAndGetLastBlockLength() throws IOException {
-    LocatedBlocks newInfo = DFSClient.callGetBlockLocations(dfsClient.namenode, src, 0, prefetchSize);
+    LocatedBlocks newInfo = dfsClient.getLocatedBlocks(src, 0, prefetchSize);
     if (DFSClient.LOG.isDebugEnabled()) {
       DFSClient.LOG.debug("newInfo = " + newInfo);
     }
@@ -298,7 +298,7 @@ public class DFSInputStream extends FSIn
         targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx);
         // fetch more blocks
         LocatedBlocks newBlocks;
-        newBlocks = DFSClient.callGetBlockLocations(dfsClient.namenode, src, offset, prefetchSize);
+        newBlocks = dfsClient.getLocatedBlocks(src, offset, prefetchSize);
         assert (newBlocks != null) : "Could not find target position " + offset;
         locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks());
       }
@@ -322,7 +322,7 @@ public class DFSInputStream extends FSIn
     }
     // fetch blocks
     LocatedBlocks newBlocks;
-    newBlocks = DFSClient.callGetBlockLocations(dfsClient.namenode, src, offset, prefetchSize);
+    newBlocks = dfsClient.getLocatedBlocks(src, offset, prefetchSize);
     if (newBlocks == null) {
       throw new IOException("Could not find target position " + offset);
     }
@@ -391,7 +391,7 @@ public class DFSInputStream extends FSIn
         blk = locatedBlocks.get(blockIdx);
       if (blk == null || curOff < blk.getStartOffset()) {
         LocatedBlocks newBlocks;
-        newBlocks = DFSClient.callGetBlockLocations(dfsClient.namenode, src, curOff, remaining);
+        newBlocks = dfsClient.getLocatedBlocks(src, curOff, remaining);
         locatedBlocks.insertRange(blockIdx, newBlocks.getLocatedBlocks());
         continue;
       }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java?rev=1358347&r1=1358346&r2=1358347&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java Fri Jul  6 18:59:55 2012
@@ -127,25 +127,21 @@ public class HAUtil {
     }
     return null;
   }
-
+  
   /**
-   * Given the configuration for this node, return a Configuration object for
-   * the other node in an HA setup.
+   * Get the NN ID of the other node in an HA setup.
    * 
-   * @param myConf the configuration of this node
-   * @return the configuration of the other node in an HA setup
+   * @param conf the configuration of this node
+   * @return the NN ID of the other node in this nameservice
    */
-  public static Configuration getConfForOtherNode(
-      Configuration myConf) {
-    
-    String nsId = DFSUtil.getNamenodeNameServiceId(myConf);
+  public static String getNameNodeIdOfOtherNode(Configuration conf, String nsId) {
     Preconditions.checkArgument(nsId != null,
         "Could not determine namespace id. Please ensure that this " +
         "machine is one of the machines listed as a NN RPC address, " +
         "or configure " + DFSConfigKeys.DFS_NAMESERVICE_ID);
     
-    Collection<String> nnIds = DFSUtil.getNameNodeIds(myConf, nsId);
-    String myNNId = myConf.get(DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY);
+    Collection<String> nnIds = DFSUtil.getNameNodeIds(conf, nsId);
+    String myNNId = conf.get(DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY);
     Preconditions.checkArgument(nnIds != null,
         "Could not determine namenode ids in namespace '%s'. " +
         "Please configure " +
@@ -165,11 +161,25 @@ public class HAUtil {
     ArrayList<String> nnSet = Lists.newArrayList(nnIds);
     nnSet.remove(myNNId);
     assert nnSet.size() == 1;
-    String activeNN = nnSet.get(0);
+    return nnSet.get(0);
+  }
+
+  /**
+   * Given the configuration for this node, return a Configuration object for
+   * the other node in an HA setup.
+   * 
+   * @param myConf the configuration of this node
+   * @return the configuration of the other node in an HA setup
+   */
+  public static Configuration getConfForOtherNode(
+      Configuration myConf) {
+    
+    String nsId = DFSUtil.getNamenodeNameServiceId(myConf);
+    String otherNn = getNameNodeIdOfOtherNode(myConf, nsId);
     
     // Look up the address of the active NN.
     Configuration confForOtherNode = new Configuration(myConf);
-    NameNode.initializeGenericKeys(confForOtherNode, nsId, activeNN);
+    NameNode.initializeGenericKeys(confForOtherNode, nsId, otherNn);
     return confForOtherNode;
   }
 

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java?rev=1358347&r1=1358346&r2=1358347&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java Fri Jul  6 18:59:55 2012
@@ -27,6 +27,8 @@ import org.apache.hadoop.hdfs.security.t
 import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.security.token.Token;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * Manages a {@link BlockTokenSecretManager} per block pool. Routes the requests
  * given a block pool Id to corresponding {@link BlockTokenSecretManager}
@@ -96,11 +98,11 @@ public class BlockPoolTokenSecretManager
   }
 
   /**
-   * See {@link BlockTokenSecretManager#setKeys(ExportedBlockKeys)}
+   * See {@link BlockTokenSecretManager#addKeys(ExportedBlockKeys)}
    */
-  public void setKeys(String bpid, ExportedBlockKeys exportedKeys)
+  public void addKeys(String bpid, ExportedBlockKeys exportedKeys)
       throws IOException {
-    get(bpid).setKeys(exportedKeys);
+    get(bpid).addKeys(exportedKeys);
   }
 
   /**
@@ -110,4 +112,11 @@ public class BlockPoolTokenSecretManager
       EnumSet<AccessMode> of) throws IOException {
     return get(b.getBlockPoolId()).generateToken(b, of);
   }
+  
+  @VisibleForTesting
+  public void clearAllKeysForTesting() {
+    for (BlockTokenSecretManager btsm : map.values()) {
+      btsm.clearAllKeysForTesting();
+    }
+  }
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java?rev=1358347&r1=1358346&r2=1358347&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java Fri Jul  6 18:59:55 2012
@@ -37,6 +37,9 @@ import org.apache.hadoop.security.UserGr
 import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.security.token.Token;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
 /**
  * BlockTokenSecretManager can be instantiated in 2 modes, master mode and slave
  * mode. Master can generate new block keys and export block keys to slaves,
@@ -49,17 +52,24 @@ public class BlockTokenSecretManager ext
     SecretManager<BlockTokenIdentifier> {
   public static final Log LOG = LogFactory
       .getLog(BlockTokenSecretManager.class);
+  
+  // We use these in an HA setup to ensure that the pair of NNs produce block
+  // token serial numbers that are in different ranges.
+  private static final int LOW_MASK  = ~(1 << 31);
+  
   public static final Token<BlockTokenIdentifier> DUMMY_TOKEN = new Token<BlockTokenIdentifier>();
 
   private final boolean isMaster;
+  private int nnIndex;
+  
   /**
    * keyUpdateInterval is the interval that NN updates its block keys. It should
    * be set long enough so that all live DN's and Balancer should have sync'ed
    * their block keys with NN at least once during each interval.
    */
-  private final long keyUpdateInterval;
+  private long keyUpdateInterval;
   private volatile long tokenLifetime;
-  private int serialNo = new SecureRandom().nextInt();
+  private int serialNo;
   private BlockKey currentKey;
   private BlockKey nextKey;
   private Map<Integer, BlockKey> allKeys;
@@ -67,22 +77,47 @@ public class BlockTokenSecretManager ext
   public static enum AccessMode {
     READ, WRITE, COPY, REPLACE
   };
-
+  
+  /**
+   * Constructor for slaves.
+   * 
+   * @param keyUpdateInterval how often a new key will be generated
+   * @param tokenLifetime how long an individual token is valid
+   */
+  public BlockTokenSecretManager(long keyUpdateInterval,
+      long tokenLifetime) {
+    this(false, keyUpdateInterval, tokenLifetime);
+  }
+  
   /**
-   * Constructor
+   * Constructor for masters.
    * 
-   * @param isMaster
-   * @param keyUpdateInterval
-   * @param tokenLifetime
-   * @throws IOException
+   * @param keyUpdateInterval how often a new key will be generated
+   * @param tokenLifetime how long an individual token is valid
+   * @param isHaEnabled whether or not HA is enabled
+   * @param thisNnId the NN ID of this NN in an HA setup
+   * @param otherNnId the NN ID of the other NN in an HA setup
    */
-  public BlockTokenSecretManager(boolean isMaster, long keyUpdateInterval,
-      long tokenLifetime) throws IOException {
+  public BlockTokenSecretManager(long keyUpdateInterval,
+      long tokenLifetime, int nnIndex) {
+    this(true, keyUpdateInterval, tokenLifetime);
+    Preconditions.checkArgument(nnIndex == 0 || nnIndex == 1);
+    this.nnIndex = nnIndex;
+    setSerialNo(new SecureRandom().nextInt());
+    generateKeys();
+  }
+  
+  private BlockTokenSecretManager(boolean isMaster, long keyUpdateInterval,
+      long tokenLifetime) {
     this.isMaster = isMaster;
     this.keyUpdateInterval = keyUpdateInterval;
     this.tokenLifetime = tokenLifetime;
     this.allKeys = new HashMap<Integer, BlockKey>();
-    generateKeys();
+  }
+  
+  @VisibleForTesting
+  public void setSerialNo(int serialNo) {
+    this.serialNo = (serialNo & LOW_MASK) | (nnIndex << 31);
   }
 
   /** Initialize block keys */
@@ -101,10 +136,10 @@ public class BlockTokenSecretManager ext
      * Similarly, the estimated expiry date for nextKey is one keyUpdateInterval
      * more.
      */
-    serialNo++;
+    setSerialNo(serialNo + 1);
     currentKey = new BlockKey(serialNo, System.currentTimeMillis() + 2
         * keyUpdateInterval + tokenLifetime, generateSecret());
-    serialNo++;
+    setSerialNo(serialNo + 1);
     nextKey = new BlockKey(serialNo, System.currentTimeMillis() + 3
         * keyUpdateInterval + tokenLifetime, generateSecret());
     allKeys.put(currentKey.getKeyId(), currentKey);
@@ -135,7 +170,7 @@ public class BlockTokenSecretManager ext
   /**
    * Set block keys, only to be used in slave mode
    */
-  public synchronized void setKeys(ExportedBlockKeys exportedKeys)
+  public synchronized void addKeys(ExportedBlockKeys exportedKeys)
       throws IOException {
     if (isMaster || exportedKeys == null)
       return;
@@ -179,7 +214,7 @@ public class BlockTokenSecretManager ext
         + 2 * keyUpdateInterval + tokenLifetime, nextKey.getKey());
     allKeys.put(currentKey.getKeyId(), currentKey);
     // generate a new nextKey
-    serialNo++;
+    setSerialNo(serialNo + 1);
     nextKey = new BlockKey(serialNo, System.currentTimeMillis() + 3
         * keyUpdateInterval + tokenLifetime, generateSecret());
     allKeys.put(nextKey.getKeyId(), nextKey);
@@ -334,4 +369,20 @@ public class BlockTokenSecretManager ext
     }
     return createPassword(identifier.getBytes(), key.getKey());
   }
+  
+  @VisibleForTesting
+  public void setKeyUpdateIntervalForTesting(long millis) {
+    this.keyUpdateInterval = millis;
+  }
+
+  @VisibleForTesting
+  public void clearAllKeysForTesting() {
+    allKeys.clear();
+  }
+  
+  @VisibleForTesting
+  public int getSerialNoForTesting() {
+    return serialNo;
+  }
+  
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java?rev=1358347&r1=1358346&r2=1358347&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java Fri Jul  6 18:59:55 2012
@@ -88,9 +88,9 @@ class NameNodeConnector {
       LOG.info("Block token params received from NN: keyUpdateInterval="
           + blockKeyUpdateInterval / (60 * 1000) + " min(s), tokenLifetime="
           + blockTokenLifetime / (60 * 1000) + " min(s)");
-      this.blockTokenSecretManager = new BlockTokenSecretManager(false,
+      this.blockTokenSecretManager = new BlockTokenSecretManager(
           blockKeyUpdateInterval, blockTokenLifetime);
-      this.blockTokenSecretManager.setKeys(keys);
+      this.blockTokenSecretManager.addKeys(keys);
       /*
        * Balancer should sync its block keys with NN more frequently than NN
        * updates its block keys
@@ -193,7 +193,7 @@ class NameNodeConnector {
       try {
         while (shouldRun) {
           try {
-            blockTokenSecretManager.setKeys(namenode.getBlockKeys());
+            blockTokenSecretManager.addKeys(namenode.getBlockKeys());
           } catch (IOException e) {
             LOG.error("Failed to set keys", e);
           }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1358347&r1=1358346&r2=1358347&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Fri Jul  6 18:59:55 2012
@@ -39,6 +39,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.HAUtil;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportIterator;
@@ -301,12 +302,24 @@ public class BlockManager {
         + "=" + updateMin + " min(s), "
         + DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_LIFETIME_KEY
         + "=" + lifetimeMin + " min(s)");
-    return new BlockTokenSecretManager(true,
-        updateMin*60*1000L, lifetimeMin*60*1000L);
+    
+    String nsId = DFSUtil.getNamenodeNameServiceId(conf);
+    boolean isHaEnabled = HAUtil.isHAEnabled(conf, nsId);
+
+    if (isHaEnabled) {
+      String thisNnId = HAUtil.getNameNodeId(conf, nsId);
+      String otherNnId = HAUtil.getNameNodeIdOfOtherNode(conf, nsId);
+      return new BlockTokenSecretManager(updateMin*60*1000L,
+          lifetimeMin*60*1000L, thisNnId.compareTo(otherNnId) < 0 ? 0 : 1);
+    } else {
+      return new BlockTokenSecretManager(updateMin*60*1000L,
+          lifetimeMin*60*1000L, 0);
+    }
   }
 
   /** get the BlockTokenSecretManager */
-  BlockTokenSecretManager getBlockTokenSecretManager() {
+  @VisibleForTesting
+  public BlockTokenSecretManager getBlockTokenSecretManager() {
     return blockTokenSecretManager;
   }
 

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java?rev=1358347&r1=1358346&r2=1358347&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java Fri Jul  6 18:59:55 2012
@@ -334,6 +334,11 @@ class BPOfferService {
     }
     
     dn.bpRegistrationSucceeded(bpRegistration, getBlockPoolId());
+    // Add the initial block token secret keys to the DN's secret manager.
+    if (dn.isBlockTokenEnabled) {
+      dn.blockPoolTokenSecretManager.addKeys(getBlockPoolId(),
+          reg.getExportedKeys());
+    }
   }
 
   /**
@@ -597,7 +602,7 @@ class BPOfferService {
     case DatanodeProtocol.DNA_ACCESSKEYUPDATE:
       LOG.info("DatanodeCommand action: DNA_ACCESSKEYUPDATE");
       if (dn.isBlockTokenEnabled) {
-        dn.blockPoolTokenSecretManager.setKeys(
+        dn.blockPoolTokenSecretManager.addKeys(
             getBlockPoolId(), 
             ((KeyUpdateCommand) cmd).getExportedKeys());
       }
@@ -628,17 +633,24 @@ class BPOfferService {
     switch(cmd.getAction()) {
     case DatanodeProtocol.DNA_REGISTER:
       // namenode requested a registration - at start or if NN lost contact
-      LOG.info("DatanodeCommand action: DNA_REGISTER");
+      LOG.info("DatanodeCommand action from standby: DNA_REGISTER");
       actor.reRegister();
-      return true;
+      break;
+    case DatanodeProtocol.DNA_ACCESSKEYUPDATE:
+      LOG.info("DatanodeCommand action from standby: DNA_ACCESSKEYUPDATE");
+      if (dn.isBlockTokenEnabled) {
+        dn.blockPoolTokenSecretManager.addKeys(
+            getBlockPoolId(), 
+            ((KeyUpdateCommand) cmd).getExportedKeys());
+      }
+      break;
     case DatanodeProtocol.DNA_TRANSFER:
     case DatanodeProtocol.DNA_INVALIDATE:
     case DatanodeProtocol.DNA_SHUTDOWN:
     case DatanodeProtocol.DNA_RECOVERBLOCK:
-    case DatanodeProtocol.DNA_ACCESSKEYUPDATE:
     case DatanodeProtocol.DNA_BALANCERBANDWIDTHUPDATE:
       LOG.warn("Got a command from standby NN - ignoring command:" + cmd.getAction());
-      return true;   
+      break;
     default:
       LOG.warn("Unknown DatanodeCommand action: " + cmd.getAction());
     }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1358347&r1=1358346&r2=1358347&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Fri Jul  6 18:59:55 2012
@@ -252,6 +252,7 @@ public class DataNode extends Configured
   
   boolean isBlockTokenEnabled;
   BlockPoolTokenSecretManager blockPoolTokenSecretManager;
+  private boolean hasAnyBlockPoolRegistered = false;
   
   volatile DataBlockScanner blockScanner = null;
   private DirectoryScanner directoryScanner = null;
@@ -715,10 +716,19 @@ public class DataNode extends Configured
    * @param blockPoolId
    * @throws IOException
    */
-  private void registerBlockPoolWithSecretManager(DatanodeRegistration bpRegistration,
-      String blockPoolId) throws IOException {
+  private synchronized void registerBlockPoolWithSecretManager(
+      DatanodeRegistration bpRegistration, String blockPoolId) throws IOException {
     ExportedBlockKeys keys = bpRegistration.getExportedKeys();
-    isBlockTokenEnabled = keys.isBlockTokenEnabled();
+    if (!hasAnyBlockPoolRegistered) {
+      hasAnyBlockPoolRegistered = true;
+      isBlockTokenEnabled = keys.isBlockTokenEnabled();
+    } else {
+      if (isBlockTokenEnabled != keys.isBlockTokenEnabled()) {
+        throw new RuntimeException("Inconsistent configuration of block access"
+            + " tokens. Either all block pools must be configured to use block"
+            + " tokens, or none may be.");
+      }
+    }
     // TODO should we check that all federated nns are either enabled or
     // disabled?
     if (!isBlockTokenEnabled) return;
@@ -732,13 +742,9 @@ public class DataNode extends Configured
           + " min(s), tokenLifetime=" + blockTokenLifetime / (60 * 1000)
           + " min(s)");
       final BlockTokenSecretManager secretMgr = 
-        new BlockTokenSecretManager(false, 0, blockTokenLifetime);
+        new BlockTokenSecretManager(0, blockTokenLifetime);
       blockPoolTokenSecretManager.addBlockPool(blockPoolId, secretMgr);
     }
-    
-    blockPoolTokenSecretManager.setKeys(blockPoolId,
-        bpRegistration.getExportedKeys());
-    bpRegistration.setExportedKeys(ExportedBlockKeys.DUMMY_KEYS);
   }
 
   /**
@@ -2200,6 +2206,11 @@ public class DataNode extends Configured
   public DatanodeID getDatanodeId() {
     return id;
   }
+  
+  @VisibleForTesting
+  public void clearAllBlockSecretKeys() {
+    blockPoolTokenSecretManager.clearAllKeysForTesting();
+  }
 
   /**
    * Get current value of the max balancer bandwidth in bytes per second.

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1358347&r1=1358346&r2=1358347&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Fri Jul  6 18:59:55 2012
@@ -830,6 +830,10 @@ class DataXceiver extends Receiver imple
       final Op op,
       final BlockTokenSecretManager.AccessMode mode) throws IOException {
     if (datanode.isBlockTokenEnabled) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Checking block access token for block '" + blk.getBlockId()
+            + "' with mode '" + mode + "'");
+      }
       try {
         datanode.blockPoolTokenSecretManager.checkAccess(t, null, blk, mode);
       } catch(InvalidToken e) {

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSClientAdapter.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSClientAdapter.java?rev=1358347&r1=1358346&r2=1358347&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSClientAdapter.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSClientAdapter.java Fri Jul  6 18:59:55 2012
@@ -27,6 +27,10 @@ public class DFSClientAdapter {
     return dfs.dfs;
   }
   
+  public static void setDFSClient(DistributedFileSystem dfs, DFSClient client) {
+    dfs.dfs = client;
+  }
+  
   public static void stopLeaseRenewer(DistributedFileSystem dfs) throws IOException {
     try {
       dfs.dfs.leaserenewer.interruptAndJoin();

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecovery2.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecovery2.java?rev=1358347&r1=1358346&r2=1358347&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecovery2.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecovery2.java Fri Jul  6 18:59:55 2012
@@ -285,8 +285,7 @@ public class TestLeaseRecovery2 {
     LocatedBlocks locatedBlocks;
     do {
       Thread.sleep(SHORT_LEASE_PERIOD);
-      locatedBlocks = DFSClient.callGetBlockLocations(dfs.dfs.namenode,
-        filestr, 0L, size);
+      locatedBlocks = dfs.dfs.getLocatedBlocks(filestr, 0L, size);
     } while (locatedBlocks.isUnderConstruction());
     assertEquals(size, locatedBlocks.getFileLength());
 
@@ -498,8 +497,7 @@ public class TestLeaseRecovery2 {
     LocatedBlocks locatedBlocks;
     do {
       Thread.sleep(SHORT_LEASE_PERIOD);
-      locatedBlocks = DFSClient.callGetBlockLocations(dfs.dfs.namenode,
-        fileStr, 0L, size);
+      locatedBlocks = dfs.dfs.getLocatedBlocks(fileStr, 0L, size);
     } while (locatedBlocks.isUnderConstruction());
     assertEquals(size, locatedBlocks.getFileLength());
 

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java?rev=1358347&r1=1358346&r2=1358347&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java Fri Jul  6 18:59:55 2012
@@ -160,8 +160,8 @@ public class TestBlockToken {
   @Test
   public void testWritable() throws Exception {
     TestWritable.testWritable(new BlockTokenIdentifier());
-    BlockTokenSecretManager sm = new BlockTokenSecretManager(true,
-        blockKeyUpdateInterval, blockTokenLifetime);
+    BlockTokenSecretManager sm = new BlockTokenSecretManager(
+        blockKeyUpdateInterval, blockTokenLifetime, 0);
     TestWritable.testWritable(generateTokenId(sm, block1,
         EnumSet.allOf(BlockTokenSecretManager.AccessMode.class)));
     TestWritable.testWritable(generateTokenId(sm, block2,
@@ -199,18 +199,18 @@ public class TestBlockToken {
   /** test block key and token handling */
   @Test
   public void testBlockTokenSecretManager() throws Exception {
-    BlockTokenSecretManager masterHandler = new BlockTokenSecretManager(true,
-        blockKeyUpdateInterval, blockTokenLifetime);
-    BlockTokenSecretManager slaveHandler = new BlockTokenSecretManager(false,
+    BlockTokenSecretManager masterHandler = new BlockTokenSecretManager(
+        blockKeyUpdateInterval, blockTokenLifetime, 0);
+    BlockTokenSecretManager slaveHandler = new BlockTokenSecretManager(
         blockKeyUpdateInterval, blockTokenLifetime);
     ExportedBlockKeys keys = masterHandler.exportKeys();
-    slaveHandler.setKeys(keys);
+    slaveHandler.addKeys(keys);
     tokenGenerationAndVerification(masterHandler, slaveHandler);
     // key updating
     masterHandler.updateKeys();
     tokenGenerationAndVerification(masterHandler, slaveHandler);
     keys = masterHandler.exportKeys();
-    slaveHandler.setKeys(keys);
+    slaveHandler.addKeys(keys);
     tokenGenerationAndVerification(masterHandler, slaveHandler);
   }
 
@@ -236,8 +236,8 @@ public class TestBlockToken {
 
   @Test
   public void testBlockTokenRpc() throws Exception {
-    BlockTokenSecretManager sm = new BlockTokenSecretManager(true,
-        blockKeyUpdateInterval, blockTokenLifetime);
+    BlockTokenSecretManager sm = new BlockTokenSecretManager(
+        blockKeyUpdateInterval, blockTokenLifetime, 0);
     Token<BlockTokenIdentifier> token = sm.generateToken(block3,
         EnumSet.allOf(BlockTokenSecretManager.AccessMode.class));
 
@@ -271,8 +271,8 @@ public class TestBlockToken {
   @Test
   public void testBlockTokenRpcLeak() throws Exception {
     Assume.assumeTrue(FD_DIR.exists());
-    BlockTokenSecretManager sm = new BlockTokenSecretManager(true,
-        blockKeyUpdateInterval, blockTokenLifetime);
+    BlockTokenSecretManager sm = new BlockTokenSecretManager(
+        blockKeyUpdateInterval, blockTokenLifetime, 0);
     Token<BlockTokenIdentifier> token = sm.generateToken(block3,
         EnumSet.allOf(BlockTokenSecretManager.AccessMode.class));
 
@@ -340,21 +340,21 @@ public class TestBlockToken {
     // Test BlockPoolSecretManager with upto 10 block pools
     for (int i = 0; i < 10; i++) {
       String bpid = Integer.toString(i);
-      BlockTokenSecretManager masterHandler = new BlockTokenSecretManager(true,
-          blockKeyUpdateInterval, blockTokenLifetime);
-      BlockTokenSecretManager slaveHandler = new BlockTokenSecretManager(false,
+      BlockTokenSecretManager masterHandler = new BlockTokenSecretManager(
+          blockKeyUpdateInterval, blockTokenLifetime, 0);
+      BlockTokenSecretManager slaveHandler = new BlockTokenSecretManager(
           blockKeyUpdateInterval, blockTokenLifetime);
       bpMgr.addBlockPool(bpid, slaveHandler);
 
       ExportedBlockKeys keys = masterHandler.exportKeys();
-      bpMgr.setKeys(bpid, keys);
+      bpMgr.addKeys(bpid, keys);
       tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid));
 
       // Test key updating
       masterHandler.updateKeys();
       tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid));
       keys = masterHandler.exportKeys();
-      bpMgr.setKeys(bpid, keys);
+      bpMgr.addKeys(bpid, keys);
       tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid));
     }
   }

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestFailoverWithBlockTokensEnabled.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestFailoverWithBlockTokensEnabled.java?rev=1358347&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestFailoverWithBlockTokensEnabled.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestFailoverWithBlockTokensEnabled.java Fri Jul  6 18:59:55 2012
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode.ha;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.ha.ServiceFailedException;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSClientAdapter;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.MiniDFSNNTopology;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class TestFailoverWithBlockTokensEnabled {
+  
+  private static final Path TEST_PATH = new Path("/test-path");
+  private static final String TEST_DATA = "very important text";
+  
+  private Configuration conf;
+  private MiniDFSCluster cluster;
+  
+  @Before
+  public void startCluster() throws IOException {
+    conf = new Configuration();
+    conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
+    cluster = new MiniDFSCluster.Builder(conf)
+        .nnTopology(MiniDFSNNTopology.simpleHATopology())
+        .numDataNodes(1)
+        .build();
+  }
+  
+  @After
+  public void shutDownCluster() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+  
+  @Test
+  public void ensureSerialNumbersNeverOverlap() {
+    BlockTokenSecretManager btsm1 = cluster.getNamesystem(0).getBlockManager()
+        .getBlockTokenSecretManager();
+    BlockTokenSecretManager btsm2 = cluster.getNamesystem(1).getBlockManager()
+        .getBlockTokenSecretManager();
+    
+    btsm1.setSerialNo(0);
+    btsm2.setSerialNo(0);
+    assertFalse(btsm1.getSerialNoForTesting() == btsm2.getSerialNoForTesting());
+    
+    btsm1.setSerialNo(Integer.MAX_VALUE);
+    btsm2.setSerialNo(Integer.MAX_VALUE);
+    assertFalse(btsm1.getSerialNoForTesting() == btsm2.getSerialNoForTesting());
+    
+    btsm1.setSerialNo(Integer.MIN_VALUE);
+    btsm2.setSerialNo(Integer.MIN_VALUE);
+    assertFalse(btsm1.getSerialNoForTesting() == btsm2.getSerialNoForTesting());
+    
+    btsm1.setSerialNo(Integer.MAX_VALUE / 2);
+    btsm2.setSerialNo(Integer.MAX_VALUE / 2);
+    assertFalse(btsm1.getSerialNoForTesting() == btsm2.getSerialNoForTesting());
+
+    btsm1.setSerialNo(Integer.MIN_VALUE / 2);
+    btsm2.setSerialNo(Integer.MIN_VALUE / 2);
+    assertFalse(btsm1.getSerialNoForTesting() == btsm2.getSerialNoForTesting());
+  }
+  
+  @Test
+  public void ensureInvalidBlockTokensAreRejected() throws IOException,
+      URISyntaxException {
+    cluster.transitionToActive(0);
+    FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf);
+    
+    DFSTestUtil.writeFile(fs, TEST_PATH, TEST_DATA);
+    assertEquals(TEST_DATA, DFSTestUtil.readFile(fs, TEST_PATH));
+    
+    DFSClient dfsClient = DFSClientAdapter.getDFSClient((DistributedFileSystem) fs);
+    DFSClient spyDfsClient = Mockito.spy(dfsClient);
+    Mockito.doAnswer(
+        new Answer<LocatedBlocks>() {
+          @Override
+          public LocatedBlocks answer(InvocationOnMock arg0) throws Throwable {
+            LocatedBlocks locatedBlocks = (LocatedBlocks)arg0.callRealMethod();
+            for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) {
+              Token<BlockTokenIdentifier> token = lb.getBlockToken();
+              BlockTokenIdentifier id = lb.getBlockToken().decodeIdentifier();
+              // This will make the token invalid, since the password
+              // won't match anymore
+              id.setExpiryDate(System.currentTimeMillis() + 10);
+              Token<BlockTokenIdentifier> newToken =
+                  new Token<BlockTokenIdentifier>(id.getBytes(),
+                      token.getPassword(), token.getKind(), token.getService());
+              lb.setBlockToken(newToken);
+            }
+            return locatedBlocks;
+          }
+        }).when(spyDfsClient).getLocatedBlocks(Mockito.anyString(),
+            Mockito.anyLong(), Mockito.anyLong());
+    DFSClientAdapter.setDFSClient((DistributedFileSystem)fs, spyDfsClient);
+    
+    try {
+      assertEquals(TEST_DATA, DFSTestUtil.readFile(fs, TEST_PATH));
+      fail("Shouldn't have been able to read a file with invalid block tokens");
+    } catch (IOException ioe) {
+      GenericTestUtils.assertExceptionContains("Could not obtain block", ioe);
+    }
+  }
+  
+  @Test
+  public void testFailoverAfterRegistration() throws IOException,
+      URISyntaxException {
+    writeUsingBothNameNodes();
+  }
+  
+  @Test
+  public void TestFailoverAfterAccessKeyUpdate() throws IOException,
+      URISyntaxException, InterruptedException {
+    lowerKeyUpdateIntervalAndClearKeys(cluster);
+    // Sleep 10s to guarantee DNs heartbeat and get new keys.
+    Thread.sleep(10 * 1000);
+    writeUsingBothNameNodes();
+  }
+  
+  private void writeUsingBothNameNodes() throws ServiceFailedException,
+      IOException, URISyntaxException {
+    cluster.transitionToActive(0);
+    
+    FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf);
+    DFSTestUtil.writeFile(fs, TEST_PATH, TEST_DATA);
+    
+    cluster.transitionToStandby(0);
+    cluster.transitionToActive(1);
+    
+    fs.delete(TEST_PATH, false);
+    DFSTestUtil.writeFile(fs, TEST_PATH, TEST_DATA);
+  }
+  
+  private static void lowerKeyUpdateIntervalAndClearKeys(MiniDFSCluster cluster) {
+    lowerKeyUpdateIntervalAndClearKeys(cluster.getNamesystem(0));
+    lowerKeyUpdateIntervalAndClearKeys(cluster.getNamesystem(1));
+    for (DataNode dn : cluster.getDataNodes()) {
+      dn.clearAllBlockSecretKeys();
+    }
+  }
+  
+  private static void lowerKeyUpdateIntervalAndClearKeys(FSNamesystem namesystem) {
+    BlockTokenSecretManager btsm = namesystem.getBlockManager()
+        .getBlockTokenSecretManager();
+    btsm.setKeyUpdateIntervalForTesting(2 * 1000);
+    btsm.setTokenLifetime(2 * 1000);
+    btsm.clearAllKeysForTesting();
+  }
+  
+}