You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2009/08/12 19:30:27 UTC

svn commit: r803622 - in /hadoop/hdfs/trunk: ./ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/server/datanode/ src/test/hdfs/org/apache/hadoop/hdfs/ src/test/hdfs/org/apache/hadoop/hdfs/server/balancer/ src/test/hdfs/org/apache/hadoo...

Author: szetszwo
Date: Wed Aug 12 17:30:27 2009
New Revision: 803622

URL: http://svn.apache.org/viewvc?rev=803622&view=rev
Log:
HDFS-409. Add more access token tests.  Contributed by Kan Zhang

Added:
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestAccessTokenWithDFS.java
Modified:
    hadoop/hdfs/trunk/CHANGES.txt
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestNodeCount.java

Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=803622&r1=803621&r2=803622&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Wed Aug 12 17:30:27 2009
@@ -92,6 +92,8 @@
     HDFS-451. Add fault injection tests, Pipeline_Fi_06,07,14,15, for
     DataTransferProtocol.  (szetszwo)
 
+    HDFS-409. Add more access token tests.  (Kan Zhang via szetszwo)
+
   BUG FIXES
 
     HDFS-76. Better error message to users when commands fail because of 

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java?rev=803622&r1=803621&r2=803622&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java Wed Aug 12 17:30:27 2009
@@ -1449,11 +1449,15 @@
       if (status != SUCCESS) {
         if (status == ERROR_ACCESS_TOKEN) {
           throw new InvalidAccessTokenException(
-              "Got access token error in response to OP_READ_BLOCK "
-                  + "for file " + file + " for block " + blockId);
+              "Got access token error for OP_READ_BLOCK, self="
+                  + sock.getLocalSocketAddress() + ", remote="
+                  + sock.getRemoteSocketAddress() + ", for file " + file
+                  + ", for block " + blockId + "_" + genStamp);
         } else {
-          throw new IOException("Got error in response to OP_READ_BLOCK "
-              + "for file " + file + " for block " + blockId);
+          throw new IOException("Got error for OP_READ_BLOCK, self="
+              + sock.getLocalSocketAddress() + ", remote="
+              + sock.getRemoteSocketAddress() + ", for file " + file
+              + ", for block " + blockId + "_" + genStamp);
         }
       }
       DataChecksum checksum = DataChecksum.newDataChecksum( in );
@@ -1729,9 +1733,10 @@
               buffersize, verifyChecksum, clientName);
           return chosenNode;
         } catch (IOException ex) {
-          LOG.debug("Failed to connect to " + targetAddr + ":" 
-                    + StringUtils.stringifyException(ex));
-          if (ex instanceof InvalidAccessTokenException && refetchToken-- > 0) {
+          if (ex instanceof InvalidAccessTokenException && refetchToken > 0) {
+            LOG.info("Will fetch a new access token and retry, " 
+                + "access token was invalid when connecting to " + targetAddr
+                + " : " + ex);
             /*
              * Get a new access token and retry. Retry is needed in 2 cases. 1)
              * When both NN and DN re-started while DFSClient holding a cached
@@ -1742,8 +1747,11 @@
              * access key from its memory since it's considered expired based on
              * the estimated expiration date.
              */
+            refetchToken--;
             fetchBlockAt(target);
           } else {
+            LOG.info("Failed to connect to " + targetAddr
+                + ", add to deadNodes and continue", ex);
             // Put chosen node into dead list, continue
             addToDeadNodes(chosenNode);
           }
@@ -1964,12 +1972,11 @@
                    e.getPos() + " from " + chosenNode.getName());
           reportChecksumFailure(src, block.getBlock(), chosenNode);
         } catch (IOException e) {
-          if (e instanceof InvalidAccessTokenException && refetchToken-- > 0) {
-            LOG.info("Invalid access token when connecting to " + targetAddr
-                + " for file " + src + " for block "
-                + block.getBlock() + ":"
-                + StringUtils.stringifyException(e)
-                + ", get a new access token and retry...");
+          if (e instanceof InvalidAccessTokenException && refetchToken > 0) {
+            LOG.info("Will get a new access token and retry, "
+                + "access token was invalid when connecting to " + targetAddr
+                + " : " + e);
+            refetchToken--;
             fetchBlockAt(block.getStartOffset());
             continue;
           } else {
@@ -2982,6 +2989,10 @@
         return nodes;
       }
 
+      AccessToken getAccessToken() {
+        return accessToken;
+      }
+
       private void setLastException(IOException e) {
         if (lastException == null) {
           lastException = e;
@@ -3386,6 +3397,14 @@
     long getInitialLen() {
       return initialFileSize;
     }
+
+    /**
+     * Returns the access token currently used by streamer, for testing only
+     */
+    AccessToken getAccessToken() {
+      return streamer.getAccessToken();
+    }
+
   }
 
   void reportChecksumFailure(String file, Block blk, DatanodeInfo dn) {

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=803622&r1=803621&r2=803622&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Wed Aug 12 17:30:27 2009
@@ -140,8 +140,8 @@
       try {
         ERROR_ACCESS_TOKEN.write(out);
         out.flush();
-        throw new IOException("Access token verification failed, on client "
-            + "request for reading block " + block);
+        throw new IOException("Access token verification failed, for client "
+            + remoteAddress + " for OP_READ_BLOCK for block " + block);
       } finally {
         IOUtils.closeStream(out);
       }
@@ -235,8 +235,8 @@
           Text.writeString(replyOut, datanode.dnRegistration.getName());
           replyOut.flush();
         }
-        throw new IOException("Access token verification failed, on client "
-            + "request for writing block " + block);
+        throw new IOException("Access token verification failed, for client "
+            + remoteAddress + " for OP_WRITE_BLOCK for block " + block);
       } finally {
         IOUtils.closeStream(replyOut);
       }
@@ -388,8 +388,8 @@
         ERROR_ACCESS_TOKEN.write(out);
         out.flush();
         throw new IOException(
-            "Access token verification failed, on getBlockChecksum() "
-                + "for block " + block);
+            "Access token verification failed, for client " + remoteAddress
+                + " for OP_BLOCK_CHECKSUM for block " + block);
       } finally {
         IOUtils.closeStream(out);
       }
@@ -443,7 +443,7 @@
         && !datanode.accessTokenHandler.checkAccess(accessToken, null, blockId,
             AccessTokenHandler.AccessMode.COPY)) {
       LOG.warn("Invalid access token in request from "
-          + s.getRemoteSocketAddress() + " for copying block " + block);
+          + remoteAddress + " for OP_COPY_BLOCK for block " + block);
       sendResponse(s, ERROR_ACCESS_TOKEN, datanode.socketWriteTimeout);
       return;
     }
@@ -515,7 +515,7 @@
         && !datanode.accessTokenHandler.checkAccess(accessToken, null, blockId,
             AccessTokenHandler.AccessMode.REPLACE)) {
       LOG.warn("Invalid access token in request from "
-          + s.getRemoteSocketAddress() + " for replacing block " + block);
+          + remoteAddress + " for OP_REPLACE_BLOCK for block " + block);
       sendResponse(s, ERROR_ACCESS_TOKEN, datanode.socketWriteTimeout);
       return;
     }

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java?rev=803622&r1=803621&r2=803622&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java Wed Aug 12 17:30:27 2009
@@ -25,6 +25,7 @@
 import java.io.IOException;
 import java.net.URL;
 import java.net.URLConnection;
+import java.util.List;
 import java.util.Random;
 
 import org.apache.hadoop.conf.Configuration;
@@ -35,7 +36,9 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSClient.DFSDataInputStream;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.security.AccessToken;
 import org.apache.hadoop.security.UnixUserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
 
@@ -249,6 +252,15 @@
     return in.getCurrentBlock();
   }  
 
+  public static List<LocatedBlock> getAllBlocks(FSDataInputStream in)
+      throws IOException {
+    return ((DFSClient.DFSDataInputStream) in).getAllBlocks();
+  }
+
+  public static AccessToken getAccessToken(FSDataOutputStream out) {
+    return ((DFSClient.DFSOutputStream) out.getWrappedStream()).getAccessToken();
+  }
+
   static void setLogLevel2All(org.apache.commons.logging.Log log) {
     ((org.apache.commons.logging.impl.Log4JLogger)log
         ).getLogger().setLevel(org.apache.log4j.Level.ALL);

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java?rev=803622&r1=803621&r2=803622&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java Wed Aug 12 17:30:27 2009
@@ -32,6 +32,7 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.net.*;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
@@ -43,6 +44,7 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.security.*;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.ToolRunner;
 
 /**
@@ -572,6 +574,45 @@
     }
   }
 
+  /**
+   * Shutdown namenode.
+   */
+  public synchronized void shutdownNameNode() {
+    if (nameNode != null) {
+      System.out.println("Shutting down the namenode");
+      nameNode.stop();
+      nameNode.join();
+      nameNode = null;
+    }
+  }
+
+  /**
+   * Restart namenode.
+   */
+  public synchronized void restartNameNode() throws IOException {
+    shutdownNameNode();
+    nameNode = NameNode.createNameNode(new String[] {}, conf);
+    waitClusterUp();
+    System.out.println("Restarted the namenode");
+    int failedCount = 0;
+    while (true) {
+      try {
+        waitActive();
+        break;
+      } catch (IOException e) {
+        failedCount++;
+        // Cached RPC connection to namenode, if any, is expected to fail once
+        if (failedCount > 1) {
+          System.out.println("Tried waitActive() " + failedCount
+              + " time(s) and failed, giving up.  "
+              + StringUtils.stringifyException(e));
+          throw e;
+        }
+      }
+    }
+    System.out.println("Cluster is active");
+  }
+
   /*
    * Corrupt a block on all datanode
    */
@@ -611,7 +652,7 @@
   /*
    * Shutdown a particular datanode
    */
-  public DataNodeProperties stopDataNode(int i) {
+  public synchronized DataNodeProperties stopDataNode(int i) {
     if (i < 0 || i >= dataNodes.size()) {
       return null;
     }
@@ -626,60 +667,91 @@
     return dnprop;
   }
 
+  /*
+   * Shutdown a datanode by name.
+   */
+  public synchronized DataNodeProperties stopDataNode(String name) {
+    int i;
+    for (i = 0; i < dataNodes.size(); i++) {
+      DataNode dn = dataNodes.get(i).datanode;
+      if (dn.dnRegistration.getName().equals(name)) {
+        break;
+      }
+    }
+    return stopDataNode(i);
+  }
+
   /**
    * Restart a datanode
    * @param dnprop datanode's property
    * @return true if restarting is successful
    * @throws IOException
    */
-  public synchronized boolean restartDataNode(DataNodeProperties dnprop)
-  throws IOException {
+  public boolean restartDataNode(DataNodeProperties dnprop) throws IOException {
+    return restartDataNode(dnprop, false);
+  }
+
+  /**
+   * Restart a datanode, on the same port if requested
+   * @param dnprop, the datanode to restart
+   * @param keepPort, whether to use the same port 
+   * @return true if restarting is successful
+   * @throws IOException
+   */
+  public synchronized boolean restartDataNode(DataNodeProperties dnprop,
+      boolean keepPort) throws IOException {
     Configuration conf = dnprop.conf;
     String[] args = dnprop.dnArgs;
     Configuration newconf = new Configuration(conf); // save cloned config
-    dataNodes.add(new DataNodeProperties(
-                     DataNode.createDataNode(args, conf), 
-                     newconf, args));
+    if (keepPort) {
+      InetSocketAddress addr = dnprop.datanode.getSelfAddr();
+      conf.set("dfs.datanode.address", addr.getAddress().getHostAddress() + ":"
+          + addr.getPort());
+    }
+    dataNodes.add(new DataNodeProperties(DataNode.createDataNode(args, conf),
+        newconf, args));
     numDataNodes++;
     return true;
+  }
 
+  /*
+   * Restart a particular datanode, use newly assigned port
+   */
+  public boolean restartDataNode(int i) throws IOException {
+    return restartDataNode(i, false);
   }
+
   /*
-   * Restart a particular datanode
+   * Restart a particular datanode, on the same port if keepPort is true
    */
-  public synchronized boolean restartDataNode(int i) throws IOException {
+  public synchronized boolean restartDataNode(int i, boolean keepPort)
+      throws IOException {
     DataNodeProperties dnprop = stopDataNode(i);
     if (dnprop == null) {
       return false;
     } else {
-      return restartDataNode(dnprop);
+      return restartDataNode(dnprop, keepPort);
     }
   }
 
   /*
-   * Restart all datanodes
+   * Restart all datanodes, on the same ports if keepPort is true
    */
-  public synchronized boolean restartDataNodes() throws IOException {
-    for (int i = dataNodes.size()-1; i >= 0; i--) {
-      System.out.println("Restarting DataNode " + i);
-      if (!restartDataNode(i)) 
+  public synchronized boolean restartDataNodes(boolean keepPort)
+      throws IOException {
+    for (int i = dataNodes.size() - 1; i >= 0; i--) {
+      if (!restartDataNode(i, keepPort))
         return false;
+      System.out.println("Restarted DataNode " + i);
     }
     return true;
   }
 
   /*
-   * Shutdown a datanode by name.
+   * Restart all datanodes, use newly assigned ports
    */
-  public synchronized DataNodeProperties stopDataNode(String name) {
-    int i;
-    for (i = 0; i < dataNodes.size(); i++) {
-      DataNode dn = dataNodes.get(i).datanode;
-      if (dn.dnRegistration.getName().equals(name)) {
-        break;
-      }
-    }
-    return stopDataNode(i);
+  public boolean restartDataNodes() throws IOException {
+    return restartDataNodes(false);
   }
   
   /**
@@ -739,18 +811,31 @@
                                                    getNameNodePort());
     DFSClient client = new DFSClient(addr, conf);
 
-    // make sure all datanodes are alive
-    while(client.datanodeReport(DatanodeReportType.LIVE).length
-        != numDataNodes) {
+    // make sure all datanodes have registered and sent heartbeat
+    while (shouldWait(client.datanodeReport(DatanodeReportType.LIVE))) {
       try {
         Thread.sleep(100);
-      } catch (Exception e) {
+      } catch (InterruptedException e) {
       }
     }
 
     client.close();
   }
   
+  private synchronized boolean shouldWait(DatanodeInfo[] dnInfo) {
+    if (dnInfo.length != numDataNodes) {
+      return true;
+    }
+    // make sure all datanodes have sent first heartbeat to namenode,
+    // using (capacity == 0) as proxy.
+    for (DatanodeInfo dn : dnInfo) {
+      if (dn.getCapacity() == 0) {
+        return true;
+      }
+    }
+    return false;
+  }
+
   public void formatDataNodeDirs() throws IOException {
     base_dir = new File(getBaseDirectory());
     data_dir = new File(base_dir, "data");

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java?rev=803622&r1=803621&r2=803622&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java Wed Aug 12 17:30:27 2009
@@ -34,7 +34,6 @@
 import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.security.AccessTokenHandler;
 
 import junit.framework.TestCase;
 /**
@@ -60,7 +59,6 @@
   }
 
   private void initConf(Configuration conf) {
-    conf.setBoolean(AccessTokenHandler.STRING_ENABLE_ACCESS_TOKEN, false);
     conf.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
     conf.setInt("io.bytes.per.checksum", DEFAULT_BLOCK_SIZE);
     conf.setLong("dfs.heartbeat.interval", 1L);
@@ -259,23 +257,32 @@
     } while(!balanced);
 
   }
+  
+  /** one-node cluster test*/
+  private void oneNodeTest(Configuration conf) throws Exception {
+    // add an empty node with half of the CAPACITY & the same rack
+    test(conf, new long[]{CAPACITY}, new String[]{RACK0}, CAPACITY/2, RACK0);
+  }
+  
+  /** two-node cluster test */
+  private void twoNodeTest(Configuration conf) throws Exception {
+    test(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1},
+        CAPACITY, RACK2);
+  }
+  
+  /** test using a user-supplied conf */
+  public void integrationTest(Configuration conf) throws Exception {
+    initConf(conf);
+    oneNodeTest(conf);
+  }
+  
   /** Test a cluster with even distribution, 
    * then a new empty node is added to the cluster*/
   public void testBalancer0() throws Exception {
     Configuration conf = new Configuration();
     initConf(conf);
-    /** one-node cluster test*/
-    // add an empty node with half of the CAPACITY & the same rack
-    test(conf, new long[]{CAPACITY}, new String[]{RACK0}, CAPACITY/2, RACK0);
-
-    /** two-node cluster test */
-    test(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1},
-        CAPACITY, RACK2);
-    
-    /** End-to-end testing of access token, involving NN, DN, and Balancer */
-    Configuration newConf = new Configuration(conf);
-    newConf.setBoolean(AccessTokenHandler.STRING_ENABLE_ACCESS_TOKEN, true);
-    test(newConf, new long[]{CAPACITY}, new String[]{RACK0}, CAPACITY/2, RACK0);
+    oneNodeTest(conf);
+    twoNodeTest(conf);
   }
 
   /** Test unevenly distributed cluster */

Added: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestAccessTokenWithDFS.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestAccessTokenWithDFS.java?rev=803622&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestAccessTokenWithDFS.java (added)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestAccessTokenWithDFS.java Wed Aug 12 17:30:27 2009
@@ -0,0 +1,531 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.server.balancer.TestBalancer;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.AccessToken;
+import org.apache.hadoop.security.AccessTokenHandler;
+import org.apache.hadoop.security.InvalidAccessTokenException;
+import org.apache.hadoop.security.SecurityTestUtil;
+import org.apache.log4j.Level;
+
+import junit.framework.TestCase;
+
+public class TestAccessTokenWithDFS extends TestCase {
+
+  private static final int BLOCK_SIZE = 1024;
+  private static final int FILE_SIZE = 2 * BLOCK_SIZE;
+  private static final String FILE_TO_READ = "/fileToRead.dat";
+  private static final String FILE_TO_WRITE = "/fileToWrite.dat";
+  private static final String FILE_TO_APPEND = "/fileToAppend.dat";
+  private final byte[] rawData = new byte[FILE_SIZE];
+
+  {
+    ((Log4JLogger) DFSClient.LOG).getLogger().setLevel(Level.ALL);
+    Random r = new Random();
+    r.nextBytes(rawData);
+  }
+
+  private void createFile(FileSystem fs, Path filename) throws IOException {
+    FSDataOutputStream out = fs.create(filename);
+    out.write(rawData);
+    out.close();
+  }
+
+  // read a file using blockSeekTo()
+  private boolean checkFile1(FSDataInputStream in) {
+    byte[] toRead = new byte[FILE_SIZE];
+    int totalRead = 0;
+    int nRead = 0;
+    try {
+      while ((nRead = in.read(toRead, totalRead, toRead.length - totalRead)) > 0) {
+        totalRead += nRead;
+      }
+    } catch (IOException e) {
+      return false;
+    }
+    assertEquals("Cannot read file.", toRead.length, totalRead);
+    return checkFile(toRead);
+  }
+
+  // read a file using fetchBlockByteRange()
+  private boolean checkFile2(FSDataInputStream in) {
+    byte[] toRead = new byte[FILE_SIZE];
+    try {
+      assertEquals("Cannot read file", toRead.length, in.read(0, toRead, 0,
+          toRead.length));
+    } catch (IOException e) {
+      return false;
+    }
+    return checkFile(toRead);
+  }
+
+  private boolean checkFile(byte[] fileToCheck) {
+    if (fileToCheck.length != rawData.length) {
+      return false;
+    }
+    for (int i = 0; i < fileToCheck.length; i++) {
+      if (fileToCheck[i] != rawData[i]) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  // creates a file and returns a descriptor for writing to it
+  private static FSDataOutputStream writeFile(FileSystem fileSys, Path name,
+      short repl, long blockSize) throws IOException {
+    FSDataOutputStream stm = fileSys.create(name, true, fileSys.getConf()
+        .getInt("io.file.buffer.size", 4096), repl, blockSize);
+    return stm;
+  }
+
+  // try reading a block using a BlockReader directly
+  private static void tryRead(Configuration conf, LocatedBlock lblock,
+      boolean shouldSucceed) {
+    InetSocketAddress targetAddr = null;
+    Socket s = null;
+    DFSClient.BlockReader blockReader = null;
+    Block block = lblock.getBlock();
+    try {
+      DatanodeInfo[] nodes = lblock.getLocations();
+      targetAddr = NetUtils.createSocketAddr(nodes[0].getName());
+      s = new Socket();
+      s.connect(targetAddr, HdfsConstants.READ_TIMEOUT);
+      s.setSoTimeout(HdfsConstants.READ_TIMEOUT);
+
+      blockReader = DFSClient.BlockReader.newBlockReader(s, targetAddr
+          .toString()
+          + ":" + block.getBlockId(), block.getBlockId(), lblock
+          .getAccessToken(), block.getGenerationStamp(), 0, -1, conf.getInt(
+          "io.file.buffer.size", 4096));
+
+    } catch (IOException ex) {
+      if (ex instanceof InvalidAccessTokenException) {
+        assertFalse("OP_READ_BLOCK: access token is invalid, "
+            + "when it is expected to be valid", shouldSucceed);
+        return;
+      }
+      fail("OP_READ_BLOCK failed due to reasons other than access token");
+    } finally {
+      if (s != null) {
+        try {
+          s.close();
+        } catch (IOException iex) {
+        } finally {
+          s = null;
+        }
+      }
+    }
+    if (blockReader == null) {
+      fail("OP_READ_BLOCK failed due to reasons other than access token");
+    }
+    assertTrue("OP_READ_BLOCK: access token is valid, "
+        + "when it is expected to be invalid", shouldSucceed);
+  }
+
+  // get a conf for testing
+  private static Configuration getConf(int numDataNodes) throws IOException {
+    Configuration conf = new Configuration();
+    conf.setBoolean(AccessTokenHandler.STRING_ENABLE_ACCESS_TOKEN, true);
+    conf.setLong("dfs.block.size", BLOCK_SIZE);
+    conf.setInt("io.bytes.per.checksum", BLOCK_SIZE);
+    conf.setInt("dfs.heartbeat.interval", 1);
+    conf.setInt("dfs.replication", numDataNodes);
+    conf.setInt("ipc.client.connect.max.retries", 0);
+    conf.setBoolean("dfs.support.append", true);
+    return conf;
+  }
+
+  /*
+   * testing that APPEND operation can handle token expiration when
+   * re-establishing pipeline is needed
+   */
+  public void testAppend() throws Exception {
+    MiniDFSCluster cluster = null;
+    int numDataNodes = 2;
+    Configuration conf = getConf(numDataNodes);
+
+    try {
+      cluster = new MiniDFSCluster(conf, numDataNodes, true, null);
+      cluster.waitActive();
+      assertEquals(numDataNodes, cluster.getDataNodes().size());
+      // set a short token lifetime (1 second)
+      SecurityTestUtil.setAccessTokenLifetime(
+          cluster.getNamesystem().accessTokenHandler, 1000L);
+      Path fileToAppend = new Path(FILE_TO_APPEND);
+      FileSystem fs = cluster.getFileSystem();
+
+      // write a one-byte file
+      FSDataOutputStream stm = writeFile(fs, fileToAppend,
+          (short) numDataNodes, BLOCK_SIZE);
+      stm.write(rawData, 0, 1);
+      stm.close();
+      // open the file again for append
+      stm = fs.append(fileToAppend);
+      int mid = rawData.length - 1;
+      stm.write(rawData, 1, mid - 1);
+      stm.sync();
+
+      /*
+       * wait till token used in stm expires
+       */
+      AccessToken token = DFSTestUtil.getAccessToken(stm);
+      while (!SecurityTestUtil.isAccessTokenExpired(token)) {
+        try {
+          Thread.sleep(10);
+        } catch (InterruptedException ignored) {
+        }
+      }
+
+      // remove a datanode to force re-establishing pipeline
+      cluster.stopDataNode(0);
+      // append the rest of the file
+      stm.write(rawData, mid, rawData.length - mid);
+      stm.close();
+      // check if append is successful
+      FSDataInputStream in5 = fs.open(fileToAppend);
+      assertTrue(checkFile1(in5));
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
+  /*
+   * testing that WRITE operation can handle token expiration when
+   * re-establishing pipeline is needed
+   */
+  public void testWrite() throws Exception {
+    MiniDFSCluster cluster = null;
+    int numDataNodes = 2;
+    Configuration conf = getConf(numDataNodes);
+
+    try {
+      cluster = new MiniDFSCluster(conf, numDataNodes, true, null);
+      cluster.waitActive();
+      assertEquals(numDataNodes, cluster.getDataNodes().size());
+      // set a short token lifetime (1 second)
+      SecurityTestUtil.setAccessTokenLifetime(
+          cluster.getNamesystem().accessTokenHandler, 1000L);
+      Path fileToWrite = new Path(FILE_TO_WRITE);
+      FileSystem fs = cluster.getFileSystem();
+
+      FSDataOutputStream stm = writeFile(fs, fileToWrite, (short) numDataNodes,
+          BLOCK_SIZE);
+      // write a partial block
+      int mid = rawData.length - 1;
+      stm.write(rawData, 0, mid);
+      stm.sync();
+
+      /*
+       * wait till token used in stm expires
+       */
+      AccessToken token = DFSTestUtil.getAccessToken(stm);
+      while (!SecurityTestUtil.isAccessTokenExpired(token)) {
+        try {
+          Thread.sleep(10);
+        } catch (InterruptedException ignored) {
+        }
+      }
+
+      // remove a datanode to force re-establishing pipeline
+      cluster.stopDataNode(0);
+      // write the rest of the file
+      stm.write(rawData, mid, rawData.length - mid);
+      stm.close();
+      // check if write is successful
+      FSDataInputStream in4 = fs.open(fileToWrite);
+      assertTrue(checkFile1(in4));
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
+  public void testRead() throws Exception {
+    MiniDFSCluster cluster = null;
+    int numDataNodes = 2;
+    Configuration conf = getConf(numDataNodes);
+
+    try {
+      cluster = new MiniDFSCluster(conf, numDataNodes, true, null);
+      cluster.waitActive();
+      assertEquals(numDataNodes, cluster.getDataNodes().size());
+      // set a short token lifetime (1 second) initially
+      SecurityTestUtil.setAccessTokenLifetime(
+          cluster.getNamesystem().accessTokenHandler, 1000L);
+      Path fileToRead = new Path(FILE_TO_READ);
+      FileSystem fs = cluster.getFileSystem();
+      createFile(fs, fileToRead);
+
+      /*
+       * setup for testing expiration handling of cached tokens
+       */
+
+      // read using blockSeekTo(). Acquired tokens are cached in in1
+      FSDataInputStream in1 = fs.open(fileToRead);
+      assertTrue(checkFile1(in1));
+      // read using blockSeekTo(). Acquired tokens are cached in in2
+      FSDataInputStream in2 = fs.open(fileToRead);
+      assertTrue(checkFile1(in2));
+      // read using fetchBlockByteRange(). Acquired tokens are cached in in3
+      FSDataInputStream in3 = fs.open(fileToRead);
+      assertTrue(checkFile2(in3));
+
+      /*
+       * testing READ interface on DN using a BlockReader
+       */
+
+      DFSClient dfsclient = new DFSClient(new InetSocketAddress("localhost",
+          cluster.getNameNodePort()), conf);
+      List<LocatedBlock> locatedBlocks = dfsclient.getNamenode().getBlockLocations(
+          FILE_TO_READ, 0, FILE_SIZE).getLocatedBlocks();
+      LocatedBlock lblock = locatedBlocks.get(0); // first block
+      AccessToken myToken = lblock.getAccessToken();
+      // verify token is not expired
+      assertFalse(SecurityTestUtil.isAccessTokenExpired(myToken));
+      // read with valid token, should succeed
+      tryRead(conf, lblock, true);
+
+      /*
+       * wait till myToken and all cached tokens in in1, in2 and in3 expire
+       */
+
+      while (!SecurityTestUtil.isAccessTokenExpired(myToken)) {
+        try {
+          Thread.sleep(10);
+        } catch (InterruptedException ignored) {
+        }
+      }
+
+      /*
+       * continue testing READ interface on DN using a BlockReader
+       */
+
+      // verify token is expired
+      assertTrue(SecurityTestUtil.isAccessTokenExpired(myToken));
+      // read should fail
+      tryRead(conf, lblock, false);
+      // use a valid new token
+      lblock.setAccessToken(cluster.getNamesystem().accessTokenHandler
+          .generateToken(lblock.getBlock().getBlockId(), EnumSet
+              .of(AccessTokenHandler.AccessMode.READ)));
+      // read should succeed
+      tryRead(conf, lblock, true);
+      // use a token with wrong blockID
+      lblock.setAccessToken(cluster.getNamesystem().accessTokenHandler
+          .generateToken(lblock.getBlock().getBlockId() + 1, EnumSet
+              .of(AccessTokenHandler.AccessMode.READ)));
+      // read should fail
+      tryRead(conf, lblock, false);
+      // use a token with wrong access modes
+      lblock.setAccessToken(cluster.getNamesystem().accessTokenHandler
+          .generateToken(lblock.getBlock().getBlockId(), EnumSet.of(
+              AccessTokenHandler.AccessMode.WRITE,
+              AccessTokenHandler.AccessMode.COPY,
+              AccessTokenHandler.AccessMode.REPLACE)));
+      // read should fail
+      tryRead(conf, lblock, false);
+
+      // set a long token lifetime for future tokens
+      SecurityTestUtil.setAccessTokenLifetime(
+          cluster.getNamesystem().accessTokenHandler, 600 * 1000L);
+
+      /*
+       * testing that when cached tokens are expired, DFSClient will re-fetch
+       * tokens transparently for READ.
+       */
+
+      // confirm all tokens cached in in1 are expired by now
+      List<LocatedBlock> lblocks = DFSTestUtil.getAllBlocks(in1);
+      for (LocatedBlock blk : lblocks) {
+        assertTrue(SecurityTestUtil.isAccessTokenExpired(blk.getAccessToken()));
+      }
+      // verify blockSeekTo() is able to re-fetch token transparently
+      in1.seek(0);
+      assertTrue(checkFile1(in1));
+
+      // confirm all tokens cached in in2 are expired by now
+      List<LocatedBlock> lblocks2 = DFSTestUtil.getAllBlocks(in2);
+      for (LocatedBlock blk : lblocks2) {
+        assertTrue(SecurityTestUtil.isAccessTokenExpired(blk.getAccessToken()));
+      }
+      // verify blockSeekTo() is able to re-fetch token transparently (testing
+      // via another interface method)
+      assertTrue(in2.seekToNewSource(0));
+      assertTrue(checkFile1(in2));
+
+      // confirm all tokens cached in in3 are expired by now
+      List<LocatedBlock> lblocks3 = DFSTestUtil.getAllBlocks(in3);
+      for (LocatedBlock blk : lblocks3) {
+        assertTrue(SecurityTestUtil.isAccessTokenExpired(blk.getAccessToken()));
+      }
+      // verify fetchBlockByteRange() is able to re-fetch token transparently
+      assertTrue(checkFile2(in3));
+
+      /*
+       * testing that after datanodes are restarted on the same ports, cached
+       * tokens should still work and there is no need to fetch new tokens from
+       * namenode. This test should run while namenode is down (to make sure no
+       * new tokens can be fetched from namenode).
+       */
+
+      // restart datanodes on the same ports that they currently use
+      assertTrue(cluster.restartDataNodes(true));
+      cluster.waitActive();
+      assertEquals(numDataNodes, cluster.getDataNodes().size());
+      cluster.shutdownNameNode();
+
+      // confirm tokens cached in in1 are still valid
+      lblocks = DFSTestUtil.getAllBlocks(in1);
+      for (LocatedBlock blk : lblocks) {
+        assertFalse(SecurityTestUtil.isAccessTokenExpired(blk.getAccessToken()));
+      }
+      // verify blockSeekTo() still works (forced to use cached tokens)
+      in1.seek(0);
+      assertTrue(checkFile1(in1));
+
+      // confirm tokens cached in in2 are still valid
+      lblocks2 = DFSTestUtil.getAllBlocks(in2);
+      for (LocatedBlock blk : lblocks2) {
+        assertFalse(SecurityTestUtil.isAccessTokenExpired(blk.getAccessToken()));
+      }
+      // verify blockSeekTo() still works (forced to use cached tokens)
+      in2.seekToNewSource(0);
+      assertTrue(checkFile1(in2));
+
+      // confirm tokens cached in in3 are still valid
+      lblocks3 = DFSTestUtil.getAllBlocks(in3);
+      for (LocatedBlock blk : lblocks3) {
+        assertFalse(SecurityTestUtil.isAccessTokenExpired(blk.getAccessToken()));
+      }
+      // verify fetchBlockByteRange() still works (forced to use cached tokens)
+      assertTrue(checkFile2(in3));
+
+      /*
+       * testing that when namenode is restarted, cached tokens should still
+       * work and there is no need to fetch new tokens from namenode. Like the
+       * previous test, this test should also run while namenode is down. The
+       * setup for this test depends on the previous test.
+       */
+
+      // restart the namenode and then shut it down for test
+      cluster.restartNameNode();
+      cluster.shutdownNameNode();
+
+      // verify blockSeekTo() still works (forced to use cached tokens)
+      in1.seek(0);
+      assertTrue(checkFile1(in1));
+      // verify again blockSeekTo() still works (forced to use cached tokens)
+      in2.seekToNewSource(0);
+      assertTrue(checkFile1(in2));
+      // verify fetchBlockByteRange() still works (forced to use cached tokens)
+      assertTrue(checkFile2(in3));
+
+      /*
+       * testing that after both namenode and datanodes got restarted (namenode
+       * first, followed by datanodes), DFSClient can't access DN without
+       * re-fetching tokens and is able to re-fetch tokens transparently. The
+       * setup of this test depends on the previous test.
+       */
+
+      // restore the cluster and restart the datanodes for test
+      cluster.restartNameNode();
+      assertTrue(cluster.restartDataNodes(true));
+      cluster.waitActive();
+      assertEquals(numDataNodes, cluster.getDataNodes().size());
+
+      // shutdown namenode so that DFSClient can't get new tokens from namenode
+      cluster.shutdownNameNode();
+
+      // verify blockSeekTo() fails (cached tokens become invalid)
+      in1.seek(0);
+      assertFalse(checkFile1(in1));
+      // verify fetchBlockByteRange() fails (cached tokens become invalid)
+      assertFalse(checkFile2(in3));
+
+      // restart the namenode to allow DFSClient to re-fetch tokens
+      cluster.restartNameNode();
+      // verify blockSeekTo() works again (by transparently re-fetching
+      // tokens from namenode)
+      in1.seek(0);
+      assertTrue(checkFile1(in1));
+      in2.seekToNewSource(0);
+      assertTrue(checkFile1(in2));
+      // verify fetchBlockByteRange() works again (by transparently
+      // re-fetching tokens from namenode)
+      assertTrue(checkFile2(in3));
+
+      /*
+       * testing that when datanodes are restarted on different ports, DFSClient
+       * is able to re-fetch tokens transparently to connect to them
+       */
+
+      // restart datanodes on newly assigned ports
+      assertTrue(cluster.restartDataNodes(false));
+      cluster.waitActive();
+      assertEquals(numDataNodes, cluster.getDataNodes().size());
+      // verify blockSeekTo() is able to re-fetch token transparently
+      in1.seek(0);
+      assertTrue(checkFile1(in1));
+      // verify blockSeekTo() is able to re-fetch token transparently
+      in2.seekToNewSource(0);
+      assertTrue(checkFile1(in2));
+      // verify fetchBlockByteRange() is able to re-fetch token transparently
+      assertTrue(checkFile2(in3));
+
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
+  /*
+   * Integration testing of access token, involving NN, DN, and Balancer
+   */
+  public void testEnd2End() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setBoolean(AccessTokenHandler.STRING_ENABLE_ACCESS_TOKEN, true);
+    new TestBalancer().integrationTest(conf);
+  }
+}

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestNodeCount.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestNodeCount.java?rev=803622&r1=803621&r2=803622&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestNodeCount.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestNodeCount.java Wed Aug 12 17:30:27 2009
@@ -99,7 +99,7 @@
       // check if excessive replica is detected
       do {
        num = namesystem.blockManager.countNodes(block);
-      } while (num.excessReplicas() == 2);
+      } while (num.excessReplicas() != 2);
     } finally {
       cluster.shutdown();
     }