You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ma...@apache.org on 2011/08/26 21:14:33 UTC
svn commit: r1162214 - in /hadoop/common/branches/branch-0.20-security:
CHANGES.txt src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java
src/test/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java
Author: mattf
Date: Fri Aug 26 19:14:33 2011
New Revision: 1162214
URL: http://svn.apache.org/viewvc?rev=1162214&view=rev
Log:
HADOOP-6889. Unit tests updated to 17/Aug/2011 version. Contributed by John George.
Modified:
hadoop/common/branches/branch-0.20-security/CHANGES.txt
hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java
hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java
Modified: hadoop/common/branches/branch-0.20-security/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/CHANGES.txt?rev=1162214&r1=1162213&r2=1162214&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20-security/CHANGES.txt Fri Aug 26 19:14:33 2011
@@ -72,7 +72,8 @@ Release 0.20.205.0 - unreleased
priority (Robert Joseph Evans via mahadev)
HADOOP-6889. Make RPC to have an option to timeout - backport to
- 0.20-security. (John George and Ravi Prakash via mattf)
+ 0.20-security. Unit tests updated to 17/Aug/2011 version.
+ (John George and Ravi Prakash via mattf)
MAPREDUCE-2780. Use a utility method to set service in token.
(Daryn Sharp via jitendra)
Modified: hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java?rev=1162214&r1=1162213&r2=1162214&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java Fri Aug 26 19:14:33 2011
@@ -17,11 +17,17 @@
*/
package org.apache.hadoop.hdfs;
+import java.net.SocketTimeoutException;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.LongWritable;
import java.io.IOException;
+import java.net.InetSocketAddress;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
+import java.net.SocketTimeoutException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -31,6 +37,9 @@ import org.apache.hadoop.fs.permission.F
import org.apache.hadoop.hdfs.DFSClient.DFSInputStream;
import org.apache.hadoop.hdfs.protocol.*;
import org.apache.hadoop.hdfs.protocol.FSConstants.UpgradeAction;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
+
import org.apache.hadoop.hdfs.server.common.*;
import org.apache.hadoop.hdfs.server.datanode.TestInterDatanodeProtocol;
import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
@@ -41,6 +50,11 @@ import org.apache.hadoop.security.Access
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+import org.apache.hadoop.ipc.Client;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.net.NetUtils;
+
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import junit.framework.TestCase;
@@ -53,9 +67,51 @@ import org.mockito.invocation.Invocation
* properly in case of errors.
*/
public class TestDFSClientRetries extends TestCase {
+ private static final String ADDRESS = "0.0.0.0";
+ final static private int PING_INTERVAL = 1000;
+ final static private int MIN_SLEEP_TIME = 1000;
public static final Log LOG =
LogFactory.getLog(TestDFSClientRetries.class.getName());
-
+ final static private Configuration conf = new Configuration();
+
+ private static class TestServer extends Server {
+ private boolean sleep;
+ private Class<? extends Writable> responseClass;
+
+ public TestServer(int handlerCount, boolean sleep) throws IOException {
+ this(handlerCount, sleep, LongWritable.class, null);
+ }
+
+ public TestServer(int handlerCount, boolean sleep,
+ Class<? extends Writable> paramClass,
+ Class<? extends Writable> responseClass)
+ throws IOException {
+ super(ADDRESS, 0, paramClass, handlerCount, conf);
+ this.sleep = sleep;
+ this.responseClass = responseClass;
+ }
+
+ @Override
+ public Writable call(Class<?> protocol, Writable param, long receiveTime)
+ throws IOException {
+ if (sleep) {
+ // sleep a bit
+ try {
+ Thread.sleep(PING_INTERVAL + MIN_SLEEP_TIME);
+ } catch (InterruptedException e) {}
+ }
+ if (responseClass != null) {
+ try {
+ return responseClass.newInstance();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ } else {
+ return param; // echo param as result
+ }
+ }
+ }
+
// writes 'len' bytes of data to out.
private static void writeData(OutputStream out, int len) throws IOException {
byte [] buf = new byte[4096*16];
@@ -72,7 +128,6 @@ public class TestDFSClientRetries extend
*/
public void testWriteTimeoutAtDataNode() throws IOException,
InterruptedException {
- Configuration conf = new Configuration();
final int writeTimeout = 100; //milliseconds.
// set a very short write timeout for datanode, so that tests runs fast.
@@ -245,8 +300,6 @@ public class TestDFSClientRetries extend
public void testNotYetReplicatedErrors() throws IOException
{
- Configuration conf = new Configuration();
-
// allow 1 retry (2 total calls)
conf.setInt("dfs.client.block.write.locateFollowingBlock.retries", 1);
@@ -273,7 +326,6 @@ public class TestDFSClientRetries extend
long fileSize = 4096;
Path file = new Path("/testFile");
- Configuration conf = new Configuration();
MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
int maxBlockAcquires = DFSClient.getMaxBlockAcquireFailures(conf);
@@ -383,42 +435,36 @@ public class TestDFSClientRetries extend
}
}
-
- /**
- * The following test first creates a file.
- * It verifies the block information from a datanode.
- * Then, it stops the DN and observes timeout on connection attempt.
+ /** Test that timeout occurs when DN does not respond to RPC.
+ * Start up a server and ask it to sleep for n seconds. Make an
+ * RPC to the server and set rpcTimeout to less than n and ensure
+ * that socketTimeoutException is obtained
*/
- public void testDFSClientTimeout() throws Exception {
- Configuration conf = new Configuration();
- MiniDFSCluster cluster = null;
+ public void testClientDNProtocolTimeout() throws IOException {
+ final Server server = new TestServer(1, true);
+ server.start();
+
+ final InetSocketAddress addr = NetUtils.getConnectAddress(server);
+ DatanodeID fakeDnId = new DatanodeID(
+ "localhost:" + addr.getPort(), "fake-storage", 0, addr.getPort());
+ DatanodeInfo dnInfo = new DatanodeInfo(fakeDnId);
+
+ LocatedBlock fakeBlock = new LocatedBlock(new Block(12345L), new DatanodeInfo[0]);
+
+ ClientDatanodeProtocol proxy = null;
try {
- cluster = new MiniDFSCluster(conf, 3, true, null);
- cluster.waitActive();
+ proxy = DFSClient.createClientDatanodeProtocolProxy(dnInfo, conf,
+ fakeBlock.getBlock(), fakeBlock.getBlockToken(), 500);
- //create a file
- DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem();
- String filestr = "/foo";
- Path filepath = new Path(filestr);
- DFSTestUtil.createFile(dfs, filepath, 1024L, (short)3, 0L);
- assertTrue(dfs.getClient().exists(filestr));
-
- //get block info
- LocatedBlock locatedblock = TestInterDatanodeProtocol.getLastLocatedBlock(dfs.getClient().namenode, filestr);
- DatanodeInfo[] datanodeinfo = locatedblock.getLocations();
- assertTrue(datanodeinfo.length > 0);
-
- //shutdown a data node
- cluster.stopDataNode(datanodeinfo[0].getName());
- DFSClient.createClientDatanodeProtocolProxy(datanodeinfo[0], conf,
- locatedblock.getBlock(), locatedblock.getBlockToken(), 500);
- fail("Expected an exception to have been thrown");
- } catch (IOException e) {
- DFSClient.LOG.info("Got a SocketTimeoutException ", e);
+ fail ("Did not get expected exception: SocketTimeoutException");
+ } catch (SocketTimeoutException e) {
+ LOG.info("Got the expected Exception: SocketTimeoutException");
} finally {
- if (cluster != null) {cluster.shutdown();}
+ if (proxy != null) {
+ RPC.stopProxy(proxy);
+ }
+ server.stop();
}
}
-
}
Modified: hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java?rev=1162214&r1=1162213&r2=1162214&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java Fri Aug 26 19:14:33 2011
@@ -19,6 +19,20 @@ package org.apache.hadoop.hdfs.server.da
import java.io.IOException;
import java.util.List;
+import java.net.InetSocketAddress;
+
+import java.net.SocketTimeoutException;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.StringUtils;
+
+import org.apache.hadoop.ipc.Client;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.net.NetUtils;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
@@ -34,11 +48,55 @@ import org.apache.hadoop.hdfs.server.dat
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.protocol.BlockMetaDataInfo;
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
/**
* This tests InterDataNodeProtocol for block handling.
*/
public class TestInterDatanodeProtocol extends junit.framework.TestCase {
+ private static final String ADDRESS = "0.0.0.0";
+ final static private int PING_INTERVAL = 1000;
+ final static private int MIN_SLEEP_TIME = 1000;
+ private static Configuration conf = new Configuration();
+
+ private static class TestServer extends Server {
+ private boolean sleep;
+ private Class<? extends Writable> responseClass;
+
+ public TestServer(int handlerCount, boolean sleep) throws IOException {
+ this(handlerCount, sleep, LongWritable.class, null);
+ }
+
+ public TestServer(int handlerCount, boolean sleep,
+ Class<? extends Writable> paramClass,
+ Class<? extends Writable> responseClass)
+ throws IOException {
+ super(ADDRESS, 0, paramClass, handlerCount, conf);
+ this.sleep = sleep;
+ this.responseClass = responseClass;
+ }
+
+ @Override
+ public Writable call(Class<?> protocol, Writable param, long receiveTime)
+ throws IOException {
+ if (sleep) {
+ // sleep a bit
+ try {
+ Thread.sleep(PING_INTERVAL + MIN_SLEEP_TIME);
+ } catch (InterruptedException e) {}
+ }
+ if (responseClass != null) {
+ try {
+ return responseClass.newInstance();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ } else {
+ return param; // echo param as result
+ }
+ }
+ }
+
public static void checkMetaInfo(Block b, InterDatanodeProtocol idp,
DataBlockScanner scanner) throws IOException {
BlockMetaDataInfo metainfo = idp.getBlockMetaDataInfo(b);
@@ -68,7 +126,6 @@ public class TestInterDatanodeProtocol e
* Then, it updates the block with new information and verifies again.
*/
public void testBlockMetaDataInfo() throws Exception {
- Configuration conf = new Configuration();
MiniDFSCluster cluster = null;
try {
@@ -112,41 +169,30 @@ public class TestInterDatanodeProtocol e
}
}
- /**
- * The following test first creates a file.
- * It verifies the block information from a datanode.
- * Then, it stops the DN and observes timeout on connection attempt.
+ /** Test to verify that InterDatanode RPC timesout as expected when
+ * the server DN does not respond.
*/
public void testInterDNProtocolTimeout() throws Exception {
- Configuration conf = new Configuration();
- MiniDFSCluster cluster = null;
+ final Server server = new TestServer(1, true);
+ server.start();
- try {
- cluster = new MiniDFSCluster(conf, 3, true, null);
- cluster.waitActive();
+ final InetSocketAddress addr = NetUtils.getConnectAddress(server);
+ DatanodeID fakeDnId = new DatanodeID(
+ "localhost:" + addr.getPort(), "fake-storage", 0, addr.getPort());
+ DatanodeInfo dInfo = new DatanodeInfo(fakeDnId);
+ InterDatanodeProtocol proxy = null;
- //create a file
- DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem();
- String filestr = "/foo";
- Path filepath = new Path(filestr);
- DFSTestUtil.createFile(dfs, filepath, 1024L, (short)3, 0L);
- assertTrue(dfs.getClient().exists(filestr));
-
- //get block info
- LocatedBlock locatedblock = getLastLocatedBlock(dfs.getClient().namenode, filestr);
- DatanodeInfo[] datanodeinfo = locatedblock.getLocations();
- assertTrue(datanodeinfo.length > 0);
-
- //shutdown a data node
- cluster.stopDataNode(datanodeinfo[0].getName());
- InterDatanodeProtocol idp = DataNode.createInterDataNodeProtocolProxy(
- datanodeinfo[0], conf, 500);
- fail("Expected an exception to have been thrown");
- } catch (IOException e) {
- InterDatanodeProtocol.LOG.info("Got a SocketTimeoutException ", e);
+ try {
+ proxy = DataNode.createInterDataNodeProtocolProxy(
+ dInfo, conf, 500);
+ fail ("Expected SocketTimeoutException exception, but did not get.");
+ } catch (SocketTimeoutException e) {
+ DataNode.LOG.info("Got expected Exception: SocketTimeoutException");
} finally {
- if (cluster != null) {cluster.shutdown();}
+ if (proxy != null) {
+ RPC.stopProxy(proxy);
+ }
+ server.stop();
}
}
-
}