You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ma...@apache.org on 2011/08/26 21:14:33 UTC

svn commit: r1162214 - in /hadoop/common/branches/branch-0.20-security: CHANGES.txt src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java src/test/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java

Author: mattf
Date: Fri Aug 26 19:14:33 2011
New Revision: 1162214

URL: http://svn.apache.org/viewvc?rev=1162214&view=rev
Log:
HADOOP-6889. Unit tests updated to 17/Aug/2011 version. Contributed by John George.

Modified:
    hadoop/common/branches/branch-0.20-security/CHANGES.txt
    hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java
    hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java

Modified: hadoop/common/branches/branch-0.20-security/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/CHANGES.txt?rev=1162214&r1=1162213&r2=1162214&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20-security/CHANGES.txt Fri Aug 26 19:14:33 2011
@@ -72,7 +72,8 @@ Release 0.20.205.0 - unreleased
     priority (Robert Joseph Evans via mahadev)
 
     HADOOP-6889. Make RPC to have an option to timeout - backport to 
-    0.20-security. (John George and Ravi Prakash via mattf)
+    0.20-security. Unit tests updated to 17/Aug/2011 version.
+    (John George and Ravi Prakash via mattf)
 
     MAPREDUCE-2780. Use a utility method to set service in token. 
     (Daryn Sharp via jitendra)

Modified: hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java?rev=1162214&r1=1162213&r2=1162214&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java Fri Aug 26 19:14:33 2011
@@ -17,11 +17,17 @@
  */
 package org.apache.hadoop.hdfs;
 
+import java.net.SocketTimeoutException;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.LongWritable;
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.List;
+import java.net.SocketTimeoutException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -31,6 +37,9 @@ import org.apache.hadoop.fs.permission.F
 import org.apache.hadoop.hdfs.DFSClient.DFSInputStream;
 import org.apache.hadoop.hdfs.protocol.*;
 import org.apache.hadoop.hdfs.protocol.FSConstants.UpgradeAction;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
+
 import org.apache.hadoop.hdfs.server.common.*;
 import org.apache.hadoop.hdfs.server.datanode.TestInterDatanodeProtocol;
 import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
@@ -41,6 +50,11 @@ import org.apache.hadoop.security.Access
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+import org.apache.hadoop.ipc.Client;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.net.NetUtils;
+
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 
 import junit.framework.TestCase;
@@ -53,9 +67,51 @@ import org.mockito.invocation.Invocation
  * properly in case of errors.
  */
 public class TestDFSClientRetries extends TestCase {
+  private static final String ADDRESS = "0.0.0.0";
+  final static private int PING_INTERVAL = 1000;
+  final static private int MIN_SLEEP_TIME = 1000;
   public static final Log LOG =
     LogFactory.getLog(TestDFSClientRetries.class.getName());
-  
+  final static private Configuration conf = new Configuration();
+ 
+ private static class TestServer extends Server {
+    private boolean sleep;
+    private Class<? extends Writable> responseClass;
+
+    public TestServer(int handlerCount, boolean sleep) throws IOException {
+      this(handlerCount, sleep, LongWritable.class, null);
+    }
+
+    public TestServer(int handlerCount, boolean sleep,
+        Class<? extends Writable> paramClass,
+        Class<? extends Writable> responseClass)
+      throws IOException {
+      super(ADDRESS, 0, paramClass, handlerCount, conf);
+      this.sleep = sleep;
+      this.responseClass = responseClass;
+    }
+
+    @Override
+    public Writable call(Class<?> protocol, Writable param, long receiveTime)
+        throws IOException {
+      if (sleep) {
+        // sleep a bit
+        try {
+          Thread.sleep(PING_INTERVAL + MIN_SLEEP_TIME);
+        } catch (InterruptedException e) {}
+      }
+      if (responseClass != null) {
+        try {
+          return responseClass.newInstance();
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+      } else {
+        return param;                               // echo param as result
+      }
+    }
+  }
+ 
   // writes 'len' bytes of data to out.
   private static void writeData(OutputStream out, int len) throws IOException {
     byte [] buf = new byte[4096*16];
@@ -72,7 +128,6 @@ public class TestDFSClientRetries extend
    */
   public void testWriteTimeoutAtDataNode() throws IOException,
                                                   InterruptedException { 
-    Configuration conf = new Configuration();
     
     final int writeTimeout = 100; //milliseconds.
     // set a very short write timeout for datanode, so that tests runs fast.
@@ -245,8 +300,6 @@ public class TestDFSClientRetries extend
   
   public void testNotYetReplicatedErrors() throws IOException
   {   
-    Configuration conf = new Configuration();
-    
     // allow 1 retry (2 total calls)
     conf.setInt("dfs.client.block.write.locateFollowingBlock.retries", 1);
         
@@ -273,7 +326,6 @@ public class TestDFSClientRetries extend
     long fileSize = 4096;
     Path file = new Path("/testFile");
 
-    Configuration conf = new Configuration();
     MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
 
     int maxBlockAcquires = DFSClient.getMaxBlockAcquireFailures(conf);
@@ -383,42 +435,36 @@ public class TestDFSClientRetries extend
     }
   }
   
-  
-  /**
-   * The following test first creates a file.
-   * It verifies the block information from a datanode.
-   * Then, it stops the DN and observes timeout on connection attempt. 
+  /** Test that timeout occurs when DN does not respond to RPC.
+   * Start up a server and ask it to sleep for n seconds. Make an
+   * RPC to the server and set rpcTimeout to less than n and ensure
+   * that socketTimeoutException is obtained
    */
-  public void testDFSClientTimeout() throws Exception {
-    Configuration conf = new Configuration();
-    MiniDFSCluster cluster = null;
+  public void testClientDNProtocolTimeout() throws IOException {
+    final Server server = new TestServer(1, true);
+    server.start();
+
+    final InetSocketAddress addr = NetUtils.getConnectAddress(server);
+    DatanodeID fakeDnId = new DatanodeID(
+        "localhost:" + addr.getPort(), "fake-storage", 0, addr.getPort());
+    DatanodeInfo dnInfo = new DatanodeInfo(fakeDnId);
+
+    LocatedBlock fakeBlock = new LocatedBlock(new Block(12345L), new DatanodeInfo[0]);
+
+    ClientDatanodeProtocol proxy = null;
 
     try {
-      cluster = new MiniDFSCluster(conf, 3, true, null);
-      cluster.waitActive();
+      proxy = DFSClient.createClientDatanodeProtocolProxy(dnInfo, conf,
+          fakeBlock.getBlock(), fakeBlock.getBlockToken(), 500);
 
-      //create a file
-      DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem();
-      String filestr = "/foo";
-      Path filepath = new Path(filestr);
-      DFSTestUtil.createFile(dfs, filepath, 1024L, (short)3, 0L);
-      assertTrue(dfs.getClient().exists(filestr));
-
-      //get block info
-      LocatedBlock locatedblock = TestInterDatanodeProtocol.getLastLocatedBlock(dfs.getClient().namenode, filestr);
-      DatanodeInfo[] datanodeinfo = locatedblock.getLocations();
-      assertTrue(datanodeinfo.length > 0);
-
-      //shutdown a data node
-      cluster.stopDataNode(datanodeinfo[0].getName());
-      DFSClient.createClientDatanodeProtocolProxy(datanodeinfo[0], conf, 
-        locatedblock.getBlock(), locatedblock.getBlockToken(), 500);
-      fail("Expected an exception to have been thrown"); 
-    } catch (IOException e) {
-      DFSClient.LOG.info("Got a SocketTimeoutException ", e);
+      fail ("Did not get expected exception: SocketTimeoutException");
+    } catch (SocketTimeoutException e) {
+      LOG.info("Got the expected Exception: SocketTimeoutException");
     } finally {
-      if (cluster != null) {cluster.shutdown();}
+      if (proxy != null) {
+        RPC.stopProxy(proxy);
+      }
+      server.stop();
     }
   }
-  
 }

Modified: hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java?rev=1162214&r1=1162213&r2=1162214&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java Fri Aug 26 19:14:33 2011
@@ -19,6 +19,20 @@ package org.apache.hadoop.hdfs.server.da
 
 import java.io.IOException;
 import java.util.List;
+import java.net.InetSocketAddress;
+ 
+import java.net.SocketTimeoutException;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.StringUtils;
+
+import org.apache.hadoop.ipc.Client;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.net.NetUtils;
+
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.*;
@@ -34,11 +48,55 @@ import org.apache.hadoop.hdfs.server.dat
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.protocol.BlockMetaDataInfo;
 import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
 
 /**
  * This tests InterDataNodeProtocol for block handling. 
  */
 public class TestInterDatanodeProtocol extends junit.framework.TestCase {
+ private static final String ADDRESS = "0.0.0.0";
+  final static private int PING_INTERVAL = 1000;
+  final static private int MIN_SLEEP_TIME = 1000;
+  private static Configuration conf = new Configuration();
+
+  private static class TestServer extends Server {
+    private boolean sleep;
+    private Class<? extends Writable> responseClass;
+
+    public TestServer(int handlerCount, boolean sleep) throws IOException {
+      this(handlerCount, sleep, LongWritable.class, null);
+    }
+
+    public TestServer(int handlerCount, boolean sleep,
+        Class<? extends Writable> paramClass,
+        Class<? extends Writable> responseClass)
+      throws IOException {
+      super(ADDRESS, 0, paramClass, handlerCount, conf);
+      this.sleep = sleep;
+      this.responseClass = responseClass;
+    }
+
+    @Override
+    public Writable call(Class<?> protocol, Writable param, long receiveTime)
+        throws IOException {
+      if (sleep) {
+        // sleep a bit
+        try {
+          Thread.sleep(PING_INTERVAL + MIN_SLEEP_TIME);
+        } catch (InterruptedException e) {}
+      }
+      if (responseClass != null) {
+        try {
+          return responseClass.newInstance();
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+      } else {
+        return param;                               // echo param as result
+      }
+    }
+  }
+
   public static void checkMetaInfo(Block b, InterDatanodeProtocol idp,
       DataBlockScanner scanner) throws IOException {
     BlockMetaDataInfo metainfo = idp.getBlockMetaDataInfo(b);
@@ -68,7 +126,6 @@ public class TestInterDatanodeProtocol e
    * Then, it updates the block with new information and verifies again. 
    */
   public void testBlockMetaDataInfo() throws Exception {
-    Configuration conf = new Configuration();
     MiniDFSCluster cluster = null;
 
     try {
@@ -112,41 +169,30 @@ public class TestInterDatanodeProtocol e
     }
   }
 
-  /**
-   * The following test first creates a file.
-   * It verifies the block information from a datanode.
-   * Then, it stops the DN and observes timeout on connection attempt. 
+  /** Test to verify that InterDatanode RPC timesout as expected when
+   *  the server DN does not respond.
    */
   public void testInterDNProtocolTimeout() throws Exception {
-    Configuration conf = new Configuration();
-    MiniDFSCluster cluster = null;
+    final Server server = new TestServer(1, true);
+    server.start();
 
-    try {
-      cluster = new MiniDFSCluster(conf, 3, true, null);
-      cluster.waitActive();
+    final InetSocketAddress addr = NetUtils.getConnectAddress(server);
+    DatanodeID fakeDnId = new DatanodeID(
+        "localhost:" + addr.getPort(), "fake-storage", 0, addr.getPort());
+    DatanodeInfo dInfo = new DatanodeInfo(fakeDnId);
+    InterDatanodeProtocol proxy = null;
 
-      //create a file
-      DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem();
-      String filestr = "/foo";
-      Path filepath = new Path(filestr);
-      DFSTestUtil.createFile(dfs, filepath, 1024L, (short)3, 0L);
-      assertTrue(dfs.getClient().exists(filestr));
-
-      //get block info
-      LocatedBlock locatedblock = getLastLocatedBlock(dfs.getClient().namenode, filestr);
-      DatanodeInfo[] datanodeinfo = locatedblock.getLocations();
-      assertTrue(datanodeinfo.length > 0);
-
-      //shutdown a data node
-      cluster.stopDataNode(datanodeinfo[0].getName());
-      InterDatanodeProtocol idp = DataNode.createInterDataNodeProtocolProxy(
-          datanodeinfo[0], conf, 500);
-      fail("Expected an exception to have been thrown"); 
-    } catch (IOException e) {
-      InterDatanodeProtocol.LOG.info("Got a SocketTimeoutException ", e);
+    try {
+      proxy = DataNode.createInterDataNodeProtocolProxy(
+          dInfo, conf, 500);
+      fail ("Expected SocketTimeoutException exception, but did not get.");
+    } catch (SocketTimeoutException e) {
+      DataNode.LOG.info("Got expected Exception: SocketTimeoutException");
     } finally {
-      if (cluster != null) {cluster.shutdown();}
+      if (proxy != null) {
+        RPC.stopProxy(proxy);
+      }
+      server.stop();
     }
   }
-  
 }