You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sh...@apache.org on 2009/08/19 22:51:15 UTC

svn commit: r805969 - in /hadoop/hdfs/branches/HDFS-265/src: java/org/apache/hadoop/hdfs/ java/org/apache/hadoop/hdfs/server/datanode/ java/org/apache/hadoop/hdfs/server/namenode/ test/hdfs/org/apache/hadoop/hdfs/ test/hdfs/org/apache/hadoop/hdfs/serve...

Author: shv
Date: Wed Aug 19 20:51:14 2009
New Revision: 805969

URL: http://svn.apache.org/viewvc?rev=805969&view=rev
Log:
HDFS-517. Merge -r 803338:804754 from trunk to branch HDFS-265.

Added:
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/BlockMissingException.java   (with props)
    hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestBlockMissingException.java   (with props)
    hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestAccessTokenWithDFS.java   (with props)
Modified:
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java
    hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java
    hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
    hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestNodeCount.java

Added: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/BlockMissingException.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/BlockMissingException.java?rev=805969&view=auto
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/BlockMissingException.java (added)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/BlockMissingException.java Wed Aug 19 20:51:14 2009
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import java.io.IOException;
+
+/** 
+  * This exception is thrown when a read encounters a block that has no locations
+  * associated with it.
+  */
+public class BlockMissingException extends IOException {
+
+  private static final long serialVersionUID = 1L;
+
+  private String filename;
+  private long   offset;
+
+  /**
+   * An exception that indicates that file was corrupted.
+   * @param filename name of corrupted file
+   * @param description a description of the corruption details
+   */
+  public BlockMissingException(String filename, String description, long offset) {
+    super(description);
+    this.filename = filename;
+    this.offset = offset;
+  }
+
+  /**
+   * Returns the name of the corrupted file.
+   * @return name of corrupted file
+   */
+  public String getFile() {
+    return filename;
+  }
+
+  /**
+   * Returns the offset at which this file is corrupted
+   * @return offset of corrupted file
+   */
+  public long getOffset() {
+    return offset;
+  }
+}

Propchange: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/BlockMissingException.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/DFSClient.java?rev=805969&r1=805968&r2=805969&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/DFSClient.java Wed Aug 19 20:51:14 2009
@@ -1449,11 +1449,15 @@
       if (status != SUCCESS) {
         if (status == ERROR_ACCESS_TOKEN) {
           throw new InvalidAccessTokenException(
-              "Got access token error in response to OP_READ_BLOCK "
-                  + "for file " + file + " for block " + blockId);
+              "Got access token error for OP_READ_BLOCK, self="
+                  + sock.getLocalSocketAddress() + ", remote="
+                  + sock.getRemoteSocketAddress() + ", for file " + file
+                  + ", for block " + blockId + "_" + genStamp);
         } else {
-          throw new IOException("Got error in response to OP_READ_BLOCK "
-              + "for file " + file + " for block " + blockId);
+          throw new IOException("Got error for OP_READ_BLOCK, self="
+              + sock.getLocalSocketAddress() + ", remote="
+              + sock.getRemoteSocketAddress() + ", for file " + file
+              + ", for block " + blockId + "_" + genStamp);
         }
       }
       DataChecksum checksum = DataChecksum.newDataChecksum( in );
@@ -1729,9 +1733,10 @@
               buffersize, verifyChecksum, clientName);
           return chosenNode;
         } catch (IOException ex) {
-          LOG.debug("Failed to connect to " + targetAddr + ":" 
-                    + StringUtils.stringifyException(ex));
-          if (ex instanceof InvalidAccessTokenException && refetchToken-- > 0) {
+          if (ex instanceof InvalidAccessTokenException && refetchToken > 0) {
+            LOG.info("Will fetch a new access token and retry, " 
+                + "access token was invalid when connecting to " + targetAddr
+                + " : " + ex);
             /*
              * Get a new access token and retry. Retry is needed in 2 cases. 1)
              * When both NN and DN re-started while DFSClient holding a cached
@@ -1742,8 +1747,11 @@
              * access key from its memory since it's considered expired based on
              * the estimated expiration date.
              */
+            refetchToken--;
             fetchBlockAt(target);
           } else {
+            LOG.info("Failed to connect to " + targetAddr
+                + ", add to deadNodes and continue", ex);
             // Put chosen node into dead list, continue
             addToDeadNodes(chosenNode);
           }
@@ -1898,7 +1906,8 @@
         } catch (IOException ie) {
           String blockInfo = block.getBlock() + " file=" + src;
           if (failures >= maxBlockAcquireFailures) {
-            throw new IOException("Could not obtain block: " + blockInfo);
+            throw new BlockMissingException(src, "Could not obtain block: " + blockInfo,
+                                            block.getStartOffset());
           }
           
           if (nodes == null || nodes.length == 0) {
@@ -1964,12 +1973,11 @@
                    e.getPos() + " from " + chosenNode.getName());
           reportChecksumFailure(src, block.getBlock(), chosenNode);
         } catch (IOException e) {
-          if (e instanceof InvalidAccessTokenException && refetchToken-- > 0) {
-            LOG.info("Invalid access token when connecting to " + targetAddr
-                + " for file " + src + " for block "
-                + block.getBlock() + ":"
-                + StringUtils.stringifyException(e)
-                + ", get a new access token and retry...");
+          if (e instanceof InvalidAccessTokenException && refetchToken > 0) {
+            LOG.info("Will get a new access token and retry, "
+                + "access token was invalid when connecting to " + targetAddr
+                + " : " + e);
+            refetchToken--;
             fetchBlockAt(block.getStartOffset());
             continue;
           } else {
@@ -2988,6 +2996,10 @@
         return nodes;
       }
 
+      AccessToken getAccessToken() {
+        return accessToken;
+      }
+
       private void setLastException(IOException e) {
         if (lastException == null) {
           lastException = e;
@@ -3404,6 +3416,14 @@
     long getInitialLen() {
       return initialFileSize;
     }
+
+    /**
+     * Returns the access token currently used by streamer, for testing only
+     */
+    AccessToken getAccessToken() {
+      return streamer.getAccessToken();
+    }
+
   }
 
   void reportChecksumFailure(String file, Block blk, DatanodeInfo dn) {

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=805969&r1=805968&r2=805969&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Wed Aug 19 20:51:14 2009
@@ -140,8 +140,8 @@
       try {
         ERROR_ACCESS_TOKEN.write(out);
         out.flush();
-        throw new IOException("Access token verification failed, on client "
-            + "request for reading block " + block);
+        throw new IOException("Access token verification failed, for client "
+            + remoteAddress + " for OP_READ_BLOCK for block " + block);
       } finally {
         IOUtils.closeStream(out);
       }
@@ -235,8 +235,8 @@
           Text.writeString(replyOut, datanode.dnRegistration.getName());
           replyOut.flush();
         }
-        throw new IOException("Access token verification failed, on client "
-            + "request for writing block " + block);
+        throw new IOException("Access token verification failed, for client "
+            + remoteAddress + " for OP_WRITE_BLOCK for block " + block);
       } finally {
         IOUtils.closeStream(replyOut);
       }
@@ -388,8 +388,8 @@
         ERROR_ACCESS_TOKEN.write(out);
         out.flush();
         throw new IOException(
-            "Access token verification failed, on getBlockChecksum() "
-                + "for block " + block);
+            "Access token verification failed, for client " + remoteAddress
+                + " for OP_BLOCK_CHECKSUM for block " + block);
       } finally {
         IOUtils.closeStream(out);
       }
@@ -443,7 +443,7 @@
         && !datanode.accessTokenHandler.checkAccess(accessToken, null, blockId,
             AccessTokenHandler.AccessMode.COPY)) {
       LOG.warn("Invalid access token in request from "
-          + s.getRemoteSocketAddress() + " for copying block " + block);
+          + remoteAddress + " for OP_COPY_BLOCK for block " + block);
       sendResponse(s, ERROR_ACCESS_TOKEN, datanode.socketWriteTimeout);
       return;
     }
@@ -515,7 +515,7 @@
         && !datanode.accessTokenHandler.checkAccess(accessToken, null, blockId,
             AccessTokenHandler.AccessMode.REPLACE)) {
       LOG.warn("Invalid access token in request from "
-          + s.getRemoteSocketAddress() + " for replacing block " + block);
+          + remoteAddress + " for OP_REPLACE_BLOCK for block " + block);
       sendResponse(s, ERROR_ACCESS_TOKEN, datanode.socketWriteTimeout);
       return;
     }

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java?rev=805969&r1=805968&r2=805969&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java Wed Aug 19 20:51:14 2009
@@ -1230,8 +1230,9 @@
    */
   boolean isReplicationInProgress(DatanodeDescriptor srcNode) {
     boolean status = false;
-    for(final Iterator<Block> i = srcNode.getBlockIterator(); i.hasNext(); ) {
-      final Block block = i.next();
+    final Iterator<? extends Block> it = srcNode.getBlockIterator();
+    while(it.hasNext()) {
+      final Block block = it.next();
       INode fileINode = blocksMap.getINode(block);
 
       if (fileINode != null) {

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java?rev=805969&r1=805968&r2=805969&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java Wed Aug 19 20:51:14 2009
@@ -229,7 +229,7 @@
   /**
    * Iterates over the list of blocks belonging to the datanode.
    */
-  static private class BlockIterator implements Iterator<Block> {
+  static private class BlockIterator implements Iterator<BlockInfo> {
     private BlockInfo current;
     private DatanodeDescriptor node;
       
@@ -253,7 +253,7 @@
     }
   }
 
-  Iterator<Block> getBlockIterator() {
+  Iterator<BlockInfo> getBlockIterator() {
     return new BlockIterator(this.blockList, this);
   }
   
@@ -402,7 +402,7 @@
     }
     // collect blocks that have not been reported
     // all of them are next to the delimiter
-    Iterator<Block> it = new BlockIterator(delimiter.getNext(0), this);
+    Iterator<? extends Block> it = new BlockIterator(delimiter.getNext(0),this);
     while(it.hasNext())
       toRemove.add(it.next());
     this.removeBlock(delimiter);

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=805969&r1=805968&r2=805969&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Wed Aug 19 20:51:14 2009
@@ -544,7 +544,7 @@
     if(numBlocks == 0) {
       return new BlocksWithLocations(new BlockWithLocations[0]);
     }
-    Iterator<Block> iter = node.getBlockIterator();
+    Iterator<BlockInfo> iter = node.getBlockIterator();
     int startBlock = r.nextInt(numBlocks); // starting from a random block
     // skip blocks
     for(int i=0; i<startBlock; i++) {
@@ -2157,7 +2157,8 @@
       }
     }
 
-    for (Iterator<Block> it = nodeInfo.getBlockIterator(); it.hasNext();) {
+    Iterator<? extends Block> it = nodeInfo.getBlockIterator();
+    while(it.hasNext()) {
       blockManager.removeStoredBlock(it.next(), nodeInfo);
     }
     unprotectedRemoveDatanode(nodeInfo);
@@ -2657,7 +2658,7 @@
       //
       // all the blocks that reside on this node have to be 
       // replicated.
-      Iterator<Block> decommissionBlocks = node.getBlockIterator();
+      Iterator<? extends Block> decommissionBlocks = node.getBlockIterator();
       while(decommissionBlocks.hasNext()) {
         Block block = decommissionBlocks.next();
         blockManager.updateNeededReplications(block, -1, 0);

Modified: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java?rev=805969&r1=805968&r2=805969&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java Wed Aug 19 20:51:14 2009
@@ -25,6 +25,7 @@
 import java.io.IOException;
 import java.net.URL;
 import java.net.URLConnection;
+import java.util.List;
 import java.util.Random;
 
 import org.apache.hadoop.conf.Configuration;
@@ -35,7 +36,9 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSClient.DFSDataInputStream;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.security.AccessToken;
 import org.apache.hadoop.security.UnixUserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
 
@@ -249,6 +252,15 @@
     return in.getCurrentBlock();
   }  
 
+  public static List<LocatedBlock> getAllBlocks(FSDataInputStream in)
+      throws IOException {
+    return ((DFSClient.DFSDataInputStream) in).getAllBlocks();
+  }
+
+  public static AccessToken getAccessToken(FSDataOutputStream out) {
+    return ((DFSClient.DFSOutputStream) out.getWrappedStream()).getAccessToken();
+  }
+
   static void setLogLevel2All(org.apache.commons.logging.Log log) {
     ((org.apache.commons.logging.impl.Log4JLogger)log
         ).getLogger().setLevel(org.apache.log4j.Level.ALL);

Modified: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java?rev=805969&r1=805968&r2=805969&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java Wed Aug 19 20:51:14 2009
@@ -32,6 +32,7 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.net.*;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
@@ -43,6 +44,7 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.security.*;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.ToolRunner;
 
 /**
@@ -572,6 +574,45 @@
     }
   }
 
+  /**
+   * Shutdown namenode.
+   */
+  public synchronized void shutdownNameNode() {
+    if (nameNode != null) {
+      System.out.println("Shutting down the namenode");
+      nameNode.stop();
+      nameNode.join();
+      nameNode = null;
+    }
+  }
+
+  /**
+   * Restart namenode.
+   */
+  public synchronized void restartNameNode() throws IOException {
+    shutdownNameNode();
+    nameNode = NameNode.createNameNode(new String[] {}, conf);
+    waitClusterUp();
+    System.out.println("Restarted the namenode");
+    int failedCount = 0;
+    while (true) {
+      try {
+        waitActive();
+        break;
+      } catch (IOException e) {
+        failedCount++;
+        // Cached RPC connection to namenode, if any, is expected to fail once
+        if (failedCount > 1) {
+          System.out.println("Tried waitActive() " + failedCount
+              + " time(s) and failed, giving up.  "
+              + StringUtils.stringifyException(e));
+          throw e;
+        }
+      }
+    }
+    System.out.println("Cluster is active");
+  }
+
   /*
    * Corrupt a block on all datanode
    */
@@ -611,7 +652,7 @@
   /*
    * Shutdown a particular datanode
    */
-  public DataNodeProperties stopDataNode(int i) {
+  public synchronized DataNodeProperties stopDataNode(int i) {
     if (i < 0 || i >= dataNodes.size()) {
       return null;
     }
@@ -626,60 +667,91 @@
     return dnprop;
   }
 
+  /*
+   * Shutdown a datanode by name.
+   */
+  public synchronized DataNodeProperties stopDataNode(String name) {
+    int i;
+    for (i = 0; i < dataNodes.size(); i++) {
+      DataNode dn = dataNodes.get(i).datanode;
+      if (dn.dnRegistration.getName().equals(name)) {
+        break;
+      }
+    }
+    return stopDataNode(i);
+  }
+
   /**
    * Restart a datanode
    * @param dnprop datanode's property
    * @return true if restarting is successful
    * @throws IOException
    */
-  public synchronized boolean restartDataNode(DataNodeProperties dnprop)
-  throws IOException {
+  public boolean restartDataNode(DataNodeProperties dnprop) throws IOException {
+    return restartDataNode(dnprop, false);
+  }
+
+  /**
+   * Restart a datanode, on the same port if requested
+   * @param dnprop, the datanode to restart
+   * @param keepPort, whether to use the same port 
+   * @return true if restarting is successful
+   * @throws IOException
+   */
+  public synchronized boolean restartDataNode(DataNodeProperties dnprop,
+      boolean keepPort) throws IOException {
     Configuration conf = dnprop.conf;
     String[] args = dnprop.dnArgs;
     Configuration newconf = new Configuration(conf); // save cloned config
-    dataNodes.add(new DataNodeProperties(
-                     DataNode.createDataNode(args, conf), 
-                     newconf, args));
+    if (keepPort) {
+      InetSocketAddress addr = dnprop.datanode.getSelfAddr();
+      conf.set("dfs.datanode.address", addr.getAddress().getHostAddress() + ":"
+          + addr.getPort());
+    }
+    dataNodes.add(new DataNodeProperties(DataNode.createDataNode(args, conf),
+        newconf, args));
     numDataNodes++;
     return true;
+  }
 
+  /*
+   * Restart a particular datanode, use newly assigned port
+   */
+  public boolean restartDataNode(int i) throws IOException {
+    return restartDataNode(i, false);
   }
+
   /*
-   * Restart a particular datanode
+   * Restart a particular datanode, on the same port if keepPort is true
    */
-  public synchronized boolean restartDataNode(int i) throws IOException {
+  public synchronized boolean restartDataNode(int i, boolean keepPort)
+      throws IOException {
     DataNodeProperties dnprop = stopDataNode(i);
     if (dnprop == null) {
       return false;
     } else {
-      return restartDataNode(dnprop);
+      return restartDataNode(dnprop, keepPort);
     }
   }
 
   /*
-   * Restart all datanodes
+   * Restart all datanodes, on the same ports if keepPort is true
    */
-  public synchronized boolean restartDataNodes() throws IOException {
-    for (int i = dataNodes.size()-1; i >= 0; i--) {
-      System.out.println("Restarting DataNode " + i);
-      if (!restartDataNode(i)) 
+  public synchronized boolean restartDataNodes(boolean keepPort)
+      throws IOException {
+    for (int i = dataNodes.size() - 1; i >= 0; i--) {
+      if (!restartDataNode(i, keepPort))
         return false;
+      System.out.println("Restarted DataNode " + i);
     }
     return true;
   }
 
   /*
-   * Shutdown a datanode by name.
+   * Restart all datanodes, use newly assigned ports
    */
-  public synchronized DataNodeProperties stopDataNode(String name) {
-    int i;
-    for (i = 0; i < dataNodes.size(); i++) {
-      DataNode dn = dataNodes.get(i).datanode;
-      if (dn.dnRegistration.getName().equals(name)) {
-        break;
-      }
-    }
-    return stopDataNode(i);
+  public boolean restartDataNodes() throws IOException {
+    return restartDataNodes(false);
   }
   
   /**
@@ -739,18 +811,31 @@
                                                    getNameNodePort());
     DFSClient client = new DFSClient(addr, conf);
 
-    // make sure all datanodes are alive
-    while(client.datanodeReport(DatanodeReportType.LIVE).length
-        != numDataNodes) {
+    // make sure all datanodes have registered and sent heartbeat
+    while (shouldWait(client.datanodeReport(DatanodeReportType.LIVE))) {
       try {
         Thread.sleep(100);
-      } catch (Exception e) {
+      } catch (InterruptedException e) {
       }
     }
 
     client.close();
   }
   
+  private synchronized boolean shouldWait(DatanodeInfo[] dnInfo) {
+    if (dnInfo.length != numDataNodes) {
+      return true;
+    }
+    // make sure all datanodes have sent first heartbeat to namenode,
+    // using (capacity == 0) as proxy.
+    for (DatanodeInfo dn : dnInfo) {
+      if (dn.getCapacity() == 0) {
+        return true;
+      }
+    }
+    return false;
+  }
+
   public void formatDataNodeDirs() throws IOException {
     base_dir = new File(getBaseDirectory());
     data_dir = new File(base_dir, "data");

Added: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestBlockMissingException.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestBlockMissingException.java?rev=805969&view=auto
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestBlockMissingException.java (added)
+++ hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestBlockMissingException.java Wed Aug 19 20:51:14 2009
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Properties;
+
+import junit.framework.TestCase;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.BlockMissingException;
+
+public class TestBlockMissingException extends TestCase {
+  final static Log LOG = LogFactory.getLog("org.apache.hadoop.hdfs.TestBlockMissing");
+  final static int NUM_DATANODES = 3;
+
+  Configuration conf;
+  MiniDFSCluster dfs = null;
+  DistributedFileSystem fileSys = null;
+
+  /**
+   * Test DFS Raid
+   */
+  public void testBlockMissingException() throws Exception {
+    LOG.info("Test testBlockMissingException started.");
+    long blockSize = 1024L;
+    int numBlocks = 4;
+    conf = new Configuration();
+    try {
+      dfs = new MiniDFSCluster(conf, NUM_DATANODES, true, null);
+      dfs.waitActive();
+      fileSys = (DistributedFileSystem)dfs.getFileSystem();
+      Path file1 = new Path("/user/dhruba/raidtest/file1");
+      createOldFile(fileSys, file1, 1, numBlocks, blockSize);
+
+      // extract block locations from File system. Wait till file is closed.
+      LocatedBlocks locations = null;
+      locations = fileSys.dfs.getNamenode().getBlockLocations(file1.toString(),
+                                                             0, numBlocks * blockSize);
+      // remove block of file
+      LOG.info("Remove first block of file");
+      corruptBlock(file1, locations.get(0).getBlock());
+
+      // validate that the system throws BlockMissingException
+      validateFile(fileSys, file1);
+    } finally {
+      if (fileSys != null) fileSys.close();
+      if (dfs != null) dfs.shutdown();
+    }
+    LOG.info("Test testBlockMissingException completed.");
+  }
+  
+  //
+  // creates a file and populate it with data.
+  //
+  private void createOldFile(FileSystem fileSys, Path name, int repl, int numBlocks, long blocksize)
+    throws IOException {
+    FSDataOutputStream stm = fileSys.create(name, true,
+                                            fileSys.getConf().getInt("io.file.buffer.size", 4096),
+                                            (short)repl, blocksize);
+    // fill data into file
+    final byte[] b = new byte[(int)blocksize];
+    for (int i = 0; i < numBlocks; i++) {
+      stm.write(b);
+    }
+    stm.close();
+  }
+
+  //
+  // validates that file encounters BlockMissingException
+  //
+  private void validateFile(FileSystem fileSys, Path name)
+    throws IOException {
+
+    FSDataInputStream stm = fileSys.open(name);
+    final byte[] b = new byte[4192];
+    int num = 0;
+    boolean gotException = false;
+
+    try {
+      while (num >= 0) {
+        num = stm.read(b);
+        if (num < 0) {
+          break;
+        }
+      }
+    } catch (BlockMissingException e) {
+      gotException = true;
+    }
+    stm.close();
+    assertTrue("Expected BlockMissingException ", gotException);
+  }
+
+  /*
+   * The Data directories for a datanode
+   */
+  private File[] getDataNodeDirs(int i) throws IOException {
+    File base_dir = new File(System.getProperty("test.build.data"), "dfs/");
+    File data_dir = new File(base_dir, "data");
+    File dir1 = new File(data_dir, "data"+(2*i+1));
+    File dir2 = new File(data_dir, "data"+(2*i+2));
+    if (dir1.isDirectory() && dir2.isDirectory()) {
+      File[] dir = new File[2];
+      dir[0] = new File(dir1, "current");
+      dir[1] = new File(dir2, "current"); 
+      return dir;
+    }
+    return new File[0];
+  }
+
+  //
+  // Corrupt specified block of file
+  //
+  void corruptBlock(Path file, Block blockNum) throws IOException {
+    long id = blockNum.getBlockId();
+
+    // Now deliberately remove/truncate data blocks from the block.
+    //
+    for (int i = 0; i < NUM_DATANODES; i++) {
+      File[] dirs = getDataNodeDirs(i);
+      
+      for (int j = 0; j < dirs.length; j++) {
+        File[] blocks = dirs[j].listFiles();
+        assertTrue("Blocks do not exist in data-dir", (blocks != null) && (blocks.length >= 0));
+        for (int idx = 0; idx < blocks.length; idx++) {
+          if (blocks[idx].getName().startsWith("blk_" + id) &&
+              !blocks[idx].getName().endsWith(".meta")) {
+            blocks[idx].delete();
+            LOG.info("Deleted block " + blocks[idx]);
+          }
+        }
+      }
+    }
+  }
+
+}

Propchange: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestBlockMissingException.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java?rev=805969&r1=805968&r2=805969&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java Wed Aug 19 20:51:14 2009
@@ -34,7 +34,6 @@
 import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.security.AccessTokenHandler;
 
 import junit.framework.TestCase;
 /**
@@ -60,7 +59,6 @@
   }
 
   private void initConf(Configuration conf) {
-    conf.setBoolean(AccessTokenHandler.STRING_ENABLE_ACCESS_TOKEN, false);
     conf.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
     conf.setInt("io.bytes.per.checksum", DEFAULT_BLOCK_SIZE);
     conf.setLong("dfs.heartbeat.interval", 1L);
@@ -259,23 +257,32 @@
     } while(!balanced);
 
   }
+  
+  /** one-node cluster test*/
+  private void oneNodeTest(Configuration conf) throws Exception {
+    // add an empty node with half of the CAPACITY & the same rack
+    test(conf, new long[]{CAPACITY}, new String[]{RACK0}, CAPACITY/2, RACK0);
+  }
+  
+  /** two-node cluster test */
+  private void twoNodeTest(Configuration conf) throws Exception {
+    test(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1},
+        CAPACITY, RACK2);
+  }
+  
+  /** test using a user-supplied conf */
+  public void integrationTest(Configuration conf) throws Exception {
+    initConf(conf);
+    oneNodeTest(conf);
+  }
+  
   /** Test a cluster with even distribution, 
    * then a new empty node is added to the cluster*/
   public void testBalancer0() throws Exception {
     Configuration conf = new Configuration();
     initConf(conf);
-    /** one-node cluster test*/
-    // add an empty node with half of the CAPACITY & the same rack
-    test(conf, new long[]{CAPACITY}, new String[]{RACK0}, CAPACITY/2, RACK0);
-
-    /** two-node cluster test */
-    test(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1},
-        CAPACITY, RACK2);
-    
-    /** End-to-end testing of access token, involving NN, DN, and Balancer */
-    Configuration newConf = new Configuration(conf);
-    newConf.setBoolean(AccessTokenHandler.STRING_ENABLE_ACCESS_TOKEN, true);
-    test(newConf, new long[]{CAPACITY}, new String[]{RACK0}, CAPACITY/2, RACK0);
+    oneNodeTest(conf);
+    twoNodeTest(conf);
   }
 
   /** Test unevenly distributed cluster */

Added: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestAccessTokenWithDFS.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestAccessTokenWithDFS.java?rev=805969&view=auto
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestAccessTokenWithDFS.java (added)
+++ hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestAccessTokenWithDFS.java Wed Aug 19 20:51:14 2009
@@ -0,0 +1,531 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.server.balancer.TestBalancer;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.AccessToken;
+import org.apache.hadoop.security.AccessTokenHandler;
+import org.apache.hadoop.security.InvalidAccessTokenException;
+import org.apache.hadoop.security.SecurityTestUtil;
+import org.apache.log4j.Level;
+
+import junit.framework.TestCase;
+
+public class TestAccessTokenWithDFS extends TestCase {
+
+  private static final int BLOCK_SIZE = 1024;
+  private static final int FILE_SIZE = 2 * BLOCK_SIZE;
+  private static final String FILE_TO_READ = "/fileToRead.dat";
+  private static final String FILE_TO_WRITE = "/fileToWrite.dat";
+  private static final String FILE_TO_APPEND = "/fileToAppend.dat";
+  private final byte[] rawData = new byte[FILE_SIZE];
+
+  {
+    ((Log4JLogger) DFSClient.LOG).getLogger().setLevel(Level.ALL);
+    Random r = new Random();
+    r.nextBytes(rawData);
+  }
+
+  private void createFile(FileSystem fs, Path filename) throws IOException {
+    FSDataOutputStream out = fs.create(filename);
+    out.write(rawData);
+    out.close();
+  }
+
+  // read a file using blockSeekTo()
+  private boolean checkFile1(FSDataInputStream in) {
+    byte[] toRead = new byte[FILE_SIZE];
+    int totalRead = 0;
+    int nRead = 0;
+    try {
+      while ((nRead = in.read(toRead, totalRead, toRead.length - totalRead)) > 0) {
+        totalRead += nRead;
+      }
+    } catch (IOException e) {
+      return false;
+    }
+    assertEquals("Cannot read file.", toRead.length, totalRead);
+    return checkFile(toRead);
+  }
+
+  // read a file using fetchBlockByteRange()
+  private boolean checkFile2(FSDataInputStream in) {
+    byte[] toRead = new byte[FILE_SIZE];
+    try {
+      assertEquals("Cannot read file", toRead.length, in.read(0, toRead, 0,
+          toRead.length));
+    } catch (IOException e) {
+      return false;
+    }
+    return checkFile(toRead);
+  }
+
+  private boolean checkFile(byte[] fileToCheck) {
+    if (fileToCheck.length != rawData.length) {
+      return false;
+    }
+    for (int i = 0; i < fileToCheck.length; i++) {
+      if (fileToCheck[i] != rawData[i]) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  // creates a file and returns a descriptor for writing to it
+  private static FSDataOutputStream writeFile(FileSystem fileSys, Path name,
+      short repl, long blockSize) throws IOException {
+    FSDataOutputStream stm = fileSys.create(name, true, fileSys.getConf()
+        .getInt("io.file.buffer.size", 4096), repl, blockSize);
+    return stm;
+  }
+
+  // try reading a block using a BlockReader directly
+  private static void tryRead(Configuration conf, LocatedBlock lblock,
+      boolean shouldSucceed) {
+    InetSocketAddress targetAddr = null;
+    Socket s = null;
+    DFSClient.BlockReader blockReader = null;
+    Block block = lblock.getBlock();
+    try {
+      DatanodeInfo[] nodes = lblock.getLocations();
+      targetAddr = NetUtils.createSocketAddr(nodes[0].getName());
+      s = new Socket();
+      s.connect(targetAddr, HdfsConstants.READ_TIMEOUT);
+      s.setSoTimeout(HdfsConstants.READ_TIMEOUT);
+
+      blockReader = DFSClient.BlockReader.newBlockReader(s, targetAddr
+          .toString()
+          + ":" + block.getBlockId(), block.getBlockId(), lblock
+          .getAccessToken(), block.getGenerationStamp(), 0, -1, conf.getInt(
+          "io.file.buffer.size", 4096));
+
+    } catch (IOException ex) {
+      if (ex instanceof InvalidAccessTokenException) {
+        assertFalse("OP_READ_BLOCK: access token is invalid, "
+            + "when it is expected to be valid", shouldSucceed);
+        return;
+      }
+      fail("OP_READ_BLOCK failed due to reasons other than access token");
+    } finally {
+      if (s != null) {
+        try {
+          s.close();
+        } catch (IOException iex) {
+        } finally {
+          s = null;
+        }
+      }
+    }
+    if (blockReader == null) {
+      fail("OP_READ_BLOCK failed due to reasons other than access token");
+    }
+    assertTrue("OP_READ_BLOCK: access token is valid, "
+        + "when it is expected to be invalid", shouldSucceed);
+  }
+
+  // get a conf for testing
+  private static Configuration getConf(int numDataNodes) throws IOException {
+    Configuration conf = new Configuration();
+    conf.setBoolean(AccessTokenHandler.STRING_ENABLE_ACCESS_TOKEN, true);
+    conf.setLong("dfs.block.size", BLOCK_SIZE);
+    conf.setInt("io.bytes.per.checksum", BLOCK_SIZE);
+    conf.setInt("dfs.heartbeat.interval", 1);
+    conf.setInt("dfs.replication", numDataNodes);
+    conf.setInt("ipc.client.connect.max.retries", 0);
+    conf.setBoolean("dfs.support.append", true);
+    return conf;
+  }
+
+  /*
+   * testing that APPEND operation can handle token expiration when
+   * re-establishing pipeline is needed
+   */
+  public void testAppend() throws Exception {
+    MiniDFSCluster cluster = null;
+    int numDataNodes = 2;
+    Configuration conf = getConf(numDataNodes);
+
+    try {
+      cluster = new MiniDFSCluster(conf, numDataNodes, true, null);
+      cluster.waitActive();
+      assertEquals(numDataNodes, cluster.getDataNodes().size());
+      // set a short token lifetime (1 second)
+      SecurityTestUtil.setAccessTokenLifetime(
+          cluster.getNamesystem().accessTokenHandler, 1000L);
+      Path fileToAppend = new Path(FILE_TO_APPEND);
+      FileSystem fs = cluster.getFileSystem();
+
+      // write a one-byte file
+      FSDataOutputStream stm = writeFile(fs, fileToAppend,
+          (short) numDataNodes, BLOCK_SIZE);
+      stm.write(rawData, 0, 1);
+      stm.close();
+      // open the file again for append
+      stm = fs.append(fileToAppend);
+      int mid = rawData.length - 1;
+      stm.write(rawData, 1, mid - 1);
+      stm.sync();
+
+      /*
+       * wait till token used in stm expires
+       */
+      AccessToken token = DFSTestUtil.getAccessToken(stm);
+      while (!SecurityTestUtil.isAccessTokenExpired(token)) {
+        try {
+          Thread.sleep(10);
+        } catch (InterruptedException ignored) {
+        }
+      }
+
+      // remove a datanode to force re-establishing pipeline
+      cluster.stopDataNode(0);
+      // append the rest of the file
+      stm.write(rawData, mid, rawData.length - mid);
+      stm.close();
+      // check if append is successful
+      FSDataInputStream in5 = fs.open(fileToAppend);
+      assertTrue(checkFile1(in5));
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
+  /*
+   * testing that WRITE operation can handle token expiration when
+   * re-establishing pipeline is needed
+   */
+  public void testWrite() throws Exception {
+    MiniDFSCluster cluster = null;
+    int numDataNodes = 2;
+    Configuration conf = getConf(numDataNodes);
+
+    try {
+      cluster = new MiniDFSCluster(conf, numDataNodes, true, null);
+      cluster.waitActive();
+      assertEquals(numDataNodes, cluster.getDataNodes().size());
+      // set a short token lifetime (1 second)
+      SecurityTestUtil.setAccessTokenLifetime(
+          cluster.getNamesystem().accessTokenHandler, 1000L);
+      Path fileToWrite = new Path(FILE_TO_WRITE);
+      FileSystem fs = cluster.getFileSystem();
+
+      FSDataOutputStream stm = writeFile(fs, fileToWrite, (short) numDataNodes,
+          BLOCK_SIZE);
+      // write a partial block
+      int mid = rawData.length - 1;
+      stm.write(rawData, 0, mid);
+      stm.sync();
+
+      /*
+       * wait till token used in stm expires
+       */
+      AccessToken token = DFSTestUtil.getAccessToken(stm);
+      while (!SecurityTestUtil.isAccessTokenExpired(token)) {
+        try {
+          Thread.sleep(10);
+        } catch (InterruptedException ignored) {
+        }
+      }
+
+      // remove a datanode to force re-establishing pipeline
+      cluster.stopDataNode(0);
+      // write the rest of the file
+      stm.write(rawData, mid, rawData.length - mid);
+      stm.close();
+      // check if write is successful
+      FSDataInputStream in4 = fs.open(fileToWrite);
+      assertTrue(checkFile1(in4));
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
+  public void testRead() throws Exception {
+    MiniDFSCluster cluster = null;
+    int numDataNodes = 2;
+    Configuration conf = getConf(numDataNodes);
+
+    try {
+      cluster = new MiniDFSCluster(conf, numDataNodes, true, null);
+      cluster.waitActive();
+      assertEquals(numDataNodes, cluster.getDataNodes().size());
+      // set a short token lifetime (1 second) initially
+      SecurityTestUtil.setAccessTokenLifetime(
+          cluster.getNamesystem().accessTokenHandler, 1000L);
+      Path fileToRead = new Path(FILE_TO_READ);
+      FileSystem fs = cluster.getFileSystem();
+      createFile(fs, fileToRead);
+
+      /*
+       * setup for testing expiration handling of cached tokens
+       */
+
+      // read using blockSeekTo(). Acquired tokens are cached in in1
+      FSDataInputStream in1 = fs.open(fileToRead);
+      assertTrue(checkFile1(in1));
+      // read using blockSeekTo(). Acquired tokens are cached in in2
+      FSDataInputStream in2 = fs.open(fileToRead);
+      assertTrue(checkFile1(in2));
+      // read using fetchBlockByteRange(). Acquired tokens are cached in in3
+      FSDataInputStream in3 = fs.open(fileToRead);
+      assertTrue(checkFile2(in3));
+
+      /*
+       * testing READ interface on DN using a BlockReader
+       */
+
+      DFSClient dfsclient = new DFSClient(new InetSocketAddress("localhost",
+          cluster.getNameNodePort()), conf);
+      List<LocatedBlock> locatedBlocks = dfsclient.getNamenode().getBlockLocations(
+          FILE_TO_READ, 0, FILE_SIZE).getLocatedBlocks();
+      LocatedBlock lblock = locatedBlocks.get(0); // first block
+      AccessToken myToken = lblock.getAccessToken();
+      // verify token is not expired
+      assertFalse(SecurityTestUtil.isAccessTokenExpired(myToken));
+      // read with valid token, should succeed
+      tryRead(conf, lblock, true);
+
+      /*
+       * wait till myToken and all cached tokens in in1, in2 and in3 expire
+       */
+
+      while (!SecurityTestUtil.isAccessTokenExpired(myToken)) {
+        try {
+          Thread.sleep(10);
+        } catch (InterruptedException ignored) {
+        }
+      }
+
+      /*
+       * continue testing READ interface on DN using a BlockReader
+       */
+
+      // verify token is expired
+      assertTrue(SecurityTestUtil.isAccessTokenExpired(myToken));
+      // read should fail
+      tryRead(conf, lblock, false);
+      // use a valid new token
+      lblock.setAccessToken(cluster.getNamesystem().accessTokenHandler
+          .generateToken(lblock.getBlock().getBlockId(), EnumSet
+              .of(AccessTokenHandler.AccessMode.READ)));
+      // read should succeed
+      tryRead(conf, lblock, true);
+      // use a token with wrong blockID
+      lblock.setAccessToken(cluster.getNamesystem().accessTokenHandler
+          .generateToken(lblock.getBlock().getBlockId() + 1, EnumSet
+              .of(AccessTokenHandler.AccessMode.READ)));
+      // read should fail
+      tryRead(conf, lblock, false);
+      // use a token with wrong access modes
+      lblock.setAccessToken(cluster.getNamesystem().accessTokenHandler
+          .generateToken(lblock.getBlock().getBlockId(), EnumSet.of(
+              AccessTokenHandler.AccessMode.WRITE,
+              AccessTokenHandler.AccessMode.COPY,
+              AccessTokenHandler.AccessMode.REPLACE)));
+      // read should fail
+      tryRead(conf, lblock, false);
+
+      // set a long token lifetime for future tokens
+      SecurityTestUtil.setAccessTokenLifetime(
+          cluster.getNamesystem().accessTokenHandler, 600 * 1000L);
+
+      /*
+       * testing that when cached tokens are expired, DFSClient will re-fetch
+       * tokens transparently for READ.
+       */
+
+      // confirm all tokens cached in in1 are expired by now
+      List<LocatedBlock> lblocks = DFSTestUtil.getAllBlocks(in1);
+      for (LocatedBlock blk : lblocks) {
+        assertTrue(SecurityTestUtil.isAccessTokenExpired(blk.getAccessToken()));
+      }
+      // verify blockSeekTo() is able to re-fetch token transparently
+      in1.seek(0);
+      assertTrue(checkFile1(in1));
+
+      // confirm all tokens cached in in2 are expired by now
+      List<LocatedBlock> lblocks2 = DFSTestUtil.getAllBlocks(in2);
+      for (LocatedBlock blk : lblocks2) {
+        assertTrue(SecurityTestUtil.isAccessTokenExpired(blk.getAccessToken()));
+      }
+      // verify blockSeekTo() is able to re-fetch token transparently (testing
+      // via another interface method)
+      assertTrue(in2.seekToNewSource(0));
+      assertTrue(checkFile1(in2));
+
+      // confirm all tokens cached in in3 are expired by now
+      List<LocatedBlock> lblocks3 = DFSTestUtil.getAllBlocks(in3);
+      for (LocatedBlock blk : lblocks3) {
+        assertTrue(SecurityTestUtil.isAccessTokenExpired(blk.getAccessToken()));
+      }
+      // verify fetchBlockByteRange() is able to re-fetch token transparently
+      assertTrue(checkFile2(in3));
+
+      /*
+       * testing that after datanodes are restarted on the same ports, cached
+       * tokens should still work and there is no need to fetch new tokens from
+       * namenode. This test should run while namenode is down (to make sure no
+       * new tokens can be fetched from namenode).
+       */
+
+      // restart datanodes on the same ports that they currently use
+      assertTrue(cluster.restartDataNodes(true));
+      cluster.waitActive();
+      assertEquals(numDataNodes, cluster.getDataNodes().size());
+      cluster.shutdownNameNode();
+
+      // confirm tokens cached in in1 are still valid
+      lblocks = DFSTestUtil.getAllBlocks(in1);
+      for (LocatedBlock blk : lblocks) {
+        assertFalse(SecurityTestUtil.isAccessTokenExpired(blk.getAccessToken()));
+      }
+      // verify blockSeekTo() still works (forced to use cached tokens)
+      in1.seek(0);
+      assertTrue(checkFile1(in1));
+
+      // confirm tokens cached in in2 are still valid
+      lblocks2 = DFSTestUtil.getAllBlocks(in2);
+      for (LocatedBlock blk : lblocks2) {
+        assertFalse(SecurityTestUtil.isAccessTokenExpired(blk.getAccessToken()));
+      }
+      // verify blockSeekTo() still works (forced to use cached tokens)
+      in2.seekToNewSource(0);
+      assertTrue(checkFile1(in2));
+
+      // confirm tokens cached in in3 are still valid
+      lblocks3 = DFSTestUtil.getAllBlocks(in3);
+      for (LocatedBlock blk : lblocks3) {
+        assertFalse(SecurityTestUtil.isAccessTokenExpired(blk.getAccessToken()));
+      }
+      // verify fetchBlockByteRange() still works (forced to use cached tokens)
+      assertTrue(checkFile2(in3));
+
+      /*
+       * testing that when namenode is restarted, cached tokens should still
+       * work and there is no need to fetch new tokens from namenode. Like the
+       * previous test, this test should also run while namenode is down. The
+       * setup for this test depends on the previous test.
+       */
+
+      // restart the namenode and then shut it down for test
+      cluster.restartNameNode();
+      cluster.shutdownNameNode();
+
+      // verify blockSeekTo() still works (forced to use cached tokens)
+      in1.seek(0);
+      assertTrue(checkFile1(in1));
+      // verify again blockSeekTo() still works (forced to use cached tokens)
+      in2.seekToNewSource(0);
+      assertTrue(checkFile1(in2));
+      // verify fetchBlockByteRange() still works (forced to use cached tokens)
+      assertTrue(checkFile2(in3));
+
+      /*
+       * testing that after both namenode and datanodes got restarted (namenode
+       * first, followed by datanodes), DFSClient can't access DN without
+       * re-fetching tokens and is able to re-fetch tokens transparently. The
+       * setup of this test depends on the previous test.
+       */
+
+      // restore the cluster and restart the datanodes for test
+      cluster.restartNameNode();
+      assertTrue(cluster.restartDataNodes(true));
+      cluster.waitActive();
+      assertEquals(numDataNodes, cluster.getDataNodes().size());
+
+      // shutdown namenode so that DFSClient can't get new tokens from namenode
+      cluster.shutdownNameNode();
+
+      // verify blockSeekTo() fails (cached tokens become invalid)
+      in1.seek(0);
+      assertFalse(checkFile1(in1));
+      // verify fetchBlockByteRange() fails (cached tokens become invalid)
+      assertFalse(checkFile2(in3));
+
+      // restart the namenode to allow DFSClient to re-fetch tokens
+      cluster.restartNameNode();
+      // verify blockSeekTo() works again (by transparently re-fetching
+      // tokens from namenode)
+      in1.seek(0);
+      assertTrue(checkFile1(in1));
+      in2.seekToNewSource(0);
+      assertTrue(checkFile1(in2));
+      // verify fetchBlockByteRange() works again (by transparently
+      // re-fetching tokens from namenode)
+      assertTrue(checkFile2(in3));
+
+      /*
+       * testing that when datanodes are restarted on different ports, DFSClient
+       * is able to re-fetch tokens transparently to connect to them
+       */
+
+      // restart datanodes on newly assigned ports
+      assertTrue(cluster.restartDataNodes(false));
+      cluster.waitActive();
+      assertEquals(numDataNodes, cluster.getDataNodes().size());
+      // verify blockSeekTo() is able to re-fetch token transparently
+      in1.seek(0);
+      assertTrue(checkFile1(in1));
+      // verify blockSeekTo() is able to re-fetch token transparently
+      in2.seekToNewSource(0);
+      assertTrue(checkFile1(in2));
+      // verify fetchBlockByteRange() is able to re-fetch token transparently
+      assertTrue(checkFile2(in3));
+
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
+  /*
+   * Integration testing of access token, involving NN, DN, and Balancer
+   */
+  public void testEnd2End() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setBoolean(AccessTokenHandler.STRING_ENABLE_ACCESS_TOKEN, true);
+    new TestBalancer().integrationTest(conf);
+  }
+}

Propchange: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestAccessTokenWithDFS.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestNodeCount.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestNodeCount.java?rev=805969&r1=805968&r2=805969&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestNodeCount.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestNodeCount.java Wed Aug 19 20:51:14 2009
@@ -99,7 +99,7 @@
       // check if excessive replica is detected
       do {
        num = namesystem.blockManager.countNodes(block);
-      } while (num.excessReplicas() == 2);
+      } while (num.excessReplicas() != 2);
     } finally {
       cluster.shutdown();
     }