You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by at...@apache.org on 2013/04/13 04:14:01 UTC
svn commit: r1467538 [3/3] - in
/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./ dev-support/
src/main/java/ src/main/java/org/apache/hadoop/hdfs/
src/main/java/org/apache/hadoop/hdfs/net/
src/main/java/org/apache/hadoop/hdfs/protocol/datatrans...
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java?rev=1467538&r1=1467537&r2=1467538&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java Sat Apr 13 02:13:59 2013
@@ -17,90 +17,333 @@
*/
package org.apache.hadoop.hdfs;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
import java.io.IOException;
+import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
+import java.util.concurrent.TimeoutException;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
+import org.apache.hadoop.io.IOUtils;
+import org.junit.Assert;
import org.junit.Test;
public class TestBlockReaderLocal {
- static MiniDFSCluster cluster;
- static HdfsConfiguration conf;
+ public static void assertArrayRegionsEqual(byte []buf1, int off1, byte []buf2,
+ int off2, int len) {
+ for (int i = 0; i < len; i++) {
+ if (buf1[off1 + i] != buf2[off2 + i]) {
+ Assert.fail("arrays differ at byte " + i + ". " +
+ "The first array has " + (int)buf1[off1 + i] +
+ ", but the second array has " + (int)buf2[off2 + i]);
+ }
+ }
+ }
- @BeforeClass
- public static void setupCluster() throws IOException {
- conf = new HdfsConfiguration();
+ /**
+ * Similar to IOUtils#readFully(). Reads bytes in a loop.
+ *
+ * @param reader The BlockReaderLocal to read bytes from
+ * @param buf The ByteBuffer to read into
+ * @param off The offset in the buffer to read into
+ * @param len The number of bytes to read.
+ *
+ * @throws IOException If it could not read the requested number of bytes
+ */
+ private static void readFully(BlockReaderLocal reader,
+ ByteBuffer buf, int off, int len) throws IOException {
+ int amt = len;
+ while (amt > 0) {
+ buf.limit(off + len);
+ buf.position(off);
+ long ret = reader.read(buf);
+ if (ret < 0) {
+ throw new EOFException( "Premature EOF from BlockReaderLocal " +
+ "after reading " + (len - amt) + " byte(s).");
+ }
+ amt -= ret;
+ off += ret;
+ }
+ }
+
+ private static interface BlockReaderLocalTest {
+ final int TEST_LENGTH = 12345;
+ public void setup(File blockFile, boolean usingChecksums)
+ throws IOException;
+ public void doTest(BlockReaderLocal reader, byte original[])
+ throws IOException;
+ }
+
+ public void runBlockReaderLocalTest(BlockReaderLocalTest test,
+ boolean checksum) throws IOException {
+ MiniDFSCluster cluster = null;
+ HdfsConfiguration conf = new HdfsConfiguration();
+ conf.setBoolean(DFSConfigKeys.
+ DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, !checksum);
+ conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, "CRC32C");
+ FileInputStream dataIn = null, checkIn = null;
+ final Path TEST_PATH = new Path("/a");
+ final long RANDOM_SEED = 4567L;
+ BlockReaderLocal blockReaderLocal = null;
+ FSDataInputStream fsIn = null;
+ byte original[] = new byte[BlockReaderLocalTest.TEST_LENGTH];
+
+ try {
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+ cluster.waitActive();
+ FileSystem fs = cluster.getFileSystem();
+ DFSTestUtil.createFile(fs, TEST_PATH,
+ BlockReaderLocalTest.TEST_LENGTH, (short)1, RANDOM_SEED);
+ try {
+ DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
+ } catch (InterruptedException e) {
+ Assert.fail("unexpected InterruptedException during " +
+ "waitReplication: " + e);
+ } catch (TimeoutException e) {
+ Assert.fail("unexpected TimeoutException during " +
+ "waitReplication: " + e);
+ }
+ fsIn = fs.open(TEST_PATH);
+ IOUtils.readFully(fsIn, original, 0,
+ BlockReaderLocalTest.TEST_LENGTH);
+ fsIn.close();
+ fsIn = null;
+ ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, TEST_PATH);
+ File dataFile = MiniDFSCluster.getBlockFile(0, block);
+ File metaFile = MiniDFSCluster.getBlockMetadataFile(0, block);
+
+ DatanodeID datanodeID = cluster.getDataNodes().get(0).getDatanodeId();
+ cluster.shutdown();
+ cluster = null;
+ test.setup(dataFile, checksum);
+ dataIn = new FileInputStream(dataFile);
+ checkIn = new FileInputStream(metaFile);
+ blockReaderLocal = new BlockReaderLocal(conf,
+ TEST_PATH.getName(), block, 0, -1,
+ dataIn, checkIn, datanodeID, checksum);
+ dataIn = null;
+ checkIn = null;
+ test.doTest(blockReaderLocal, original);
+ } finally {
+ if (fsIn != null) fsIn.close();
+ if (cluster != null) cluster.shutdown();
+ if (dataIn != null) dataIn.close();
+ if (checkIn != null) checkIn.close();
+ if (blockReaderLocal != null) blockReaderLocal.close(null, null);
+ }
+ }
+
+ private static class TestBlockReaderLocalImmediateClose
+ implements BlockReaderLocalTest {
+ @Override
+ public void setup(File blockFile, boolean usingChecksums)
+ throws IOException { }
+ @Override
+ public void doTest(BlockReaderLocal reader, byte original[])
+ throws IOException { }
+ }
+
+ @Test
+ public void testBlockReaderLocalImmediateClose() throws IOException {
+ runBlockReaderLocalTest(new TestBlockReaderLocalImmediateClose(), true);
+ runBlockReaderLocalTest(new TestBlockReaderLocalImmediateClose(), false);
+ }
+
+ private static class TestBlockReaderSimpleReads
+ implements BlockReaderLocalTest {
+ @Override
+ public void setup(File blockFile, boolean usingChecksums)
+ throws IOException { }
+ @Override
+ public void doTest(BlockReaderLocal reader, byte original[])
+ throws IOException {
+ byte buf[] = new byte[TEST_LENGTH];
+ reader.readFully(buf, 0, 512);
+ assertArrayRegionsEqual(original, 0, buf, 0, 512);
+ reader.readFully(buf, 512, 512);
+ assertArrayRegionsEqual(original, 512, buf, 512, 512);
+ reader.readFully(buf, 1024, 513);
+ assertArrayRegionsEqual(original, 1024, buf, 1024, 513);
+ reader.readFully(buf, 1537, 514);
+ assertArrayRegionsEqual(original, 1537, buf, 1537, 514);
+ }
+ }
+
+ @Test
+ public void testBlockReaderSimpleReads() throws IOException {
+ runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), true);
+ }
+
+ @Test
+ public void testBlockReaderSimpleReadsNoChecksum() throws IOException {
+ runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), false);
+ }
+
+ private static class TestBlockReaderLocalArrayReads2
+ implements BlockReaderLocalTest {
+ @Override
+ public void setup(File blockFile, boolean usingChecksums)
+ throws IOException { }
+ @Override
+ public void doTest(BlockReaderLocal reader, byte original[])
+ throws IOException {
+ byte buf[] = new byte[TEST_LENGTH];
+ reader.readFully(buf, 0, 10);
+ assertArrayRegionsEqual(original, 0, buf, 0, 10);
+ reader.readFully(buf, 10, 100);
+ assertArrayRegionsEqual(original, 10, buf, 10, 100);
+ reader.readFully(buf, 110, 700);
+ assertArrayRegionsEqual(original, 110, buf, 110, 700);
+ reader.readFully(buf, 810, 1); // from offset 810 to offset 811
+ reader.readFully(buf, 811, 5);
+ assertArrayRegionsEqual(original, 811, buf, 811, 5);
+ reader.readFully(buf, 816, 900); // skip from offset 816 to offset 1716
+ reader.readFully(buf, 1716, 5);
+ assertArrayRegionsEqual(original, 1716, buf, 1716, 5);
+ }
+ }
+
+ @Test
+ public void testBlockReaderLocalArrayReads2() throws IOException {
+ runBlockReaderLocalTest(new TestBlockReaderLocalArrayReads2(),
+ true);
+ }
- conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
- conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
+ @Test
+ public void testBlockReaderLocalArrayReads2NoChecksum()
+ throws IOException {
+ runBlockReaderLocalTest(new TestBlockReaderLocalArrayReads2(),
false);
- conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
- UserGroupInformation.getCurrentUser().getShortUserName());
+ }
- cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+ private static class TestBlockReaderLocalByteBufferReads
+ implements BlockReaderLocalTest {
+ @Override
+ public void setup(File blockFile, boolean usingChecksums)
+ throws IOException { }
+ @Override
+ public void doTest(BlockReaderLocal reader, byte original[])
+ throws IOException {
+ ByteBuffer buf = ByteBuffer.wrap(new byte[TEST_LENGTH]);
+ readFully(reader, buf, 0, 10);
+ assertArrayRegionsEqual(original, 0, buf.array(), 0, 10);
+ readFully(reader, buf, 10, 100);
+ assertArrayRegionsEqual(original, 10, buf.array(), 10, 100);
+ readFully(reader, buf, 110, 700);
+ assertArrayRegionsEqual(original, 110, buf.array(), 110, 700);
+ reader.skip(1); // skip from offset 810 to offset 811
+ readFully(reader, buf, 811, 5);
+ assertArrayRegionsEqual(original, 811, buf.array(), 811, 5);
+ }
+ }
+
+ @Test
+ public void testBlockReaderLocalByteBufferReads()
+ throws IOException {
+ runBlockReaderLocalTest(
+ new TestBlockReaderLocalByteBufferReads(), true);
}
- @AfterClass
- public static void teardownCluster() {
- cluster.shutdown();
+ @Test
+ public void testBlockReaderLocalByteBufferReadsNoChecksum()
+ throws IOException {
+ runBlockReaderLocalTest(
+ new TestBlockReaderLocalByteBufferReads(), false);
}
+
+ private static class TestBlockReaderLocalReadCorruptStart
+ implements BlockReaderLocalTest {
+ boolean usingChecksums = false;
+ @Override
+ public void setup(File blockFile, boolean usingChecksums)
+ throws IOException {
+ RandomAccessFile bf = null;
+ this.usingChecksums = usingChecksums;
+ try {
+ bf = new RandomAccessFile(blockFile, "rw");
+ bf.write(new byte[] {0,0,0,0,0,0,0,0,0,0,0,0,0,0});
+ } finally {
+ if (bf != null) bf.close();
+ }
+ }
- /**
- * Test that, in the case of an error, the position and limit of a ByteBuffer
- * are left unchanged. This is not mandated by ByteBufferReadable, but clients
- * of this class might immediately issue a retry on failure, so it's polite.
- */
+ public void doTest(BlockReaderLocal reader, byte original[])
+ throws IOException {
+ byte buf[] = new byte[TEST_LENGTH];
+ if (usingChecksums) {
+ try {
+ reader.readFully(buf, 0, 10);
+ Assert.fail("did not detect corruption");
+ } catch (IOException e) {
+ // expected
+ }
+ } else {
+ reader.readFully(buf, 0, 10);
+ }
+ }
+ }
+
@Test
- public void testStablePositionAfterCorruptRead() throws Exception {
- final short REPL_FACTOR = 1;
- final long FILE_LENGTH = 512L;
- cluster.waitActive();
- FileSystem fs = cluster.getFileSystem();
-
- Path path = new Path("/corrupted");
-
- DFSTestUtil.createFile(fs, path, FILE_LENGTH, REPL_FACTOR, 12345L);
- DFSTestUtil.waitReplication(fs, path, REPL_FACTOR);
-
- ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, path);
- int blockFilesCorrupted = cluster.corruptBlockOnDataNodes(block);
- assertEquals("All replicas not corrupted", REPL_FACTOR, blockFilesCorrupted);
-
- FSDataInputStream dis = cluster.getFileSystem().open(path);
- ByteBuffer buf = ByteBuffer.allocateDirect((int)FILE_LENGTH);
- boolean sawException = false;
- try {
- dis.read(buf);
- } catch (ChecksumException ex) {
- sawException = true;
+ public void testBlockReaderLocalReadCorruptStart()
+ throws IOException {
+ runBlockReaderLocalTest(new TestBlockReaderLocalReadCorruptStart(), true);
+ }
+
+ private static class TestBlockReaderLocalReadCorrupt
+ implements BlockReaderLocalTest {
+ boolean usingChecksums = false;
+ @Override
+ public void setup(File blockFile, boolean usingChecksums)
+ throws IOException {
+ RandomAccessFile bf = null;
+ this.usingChecksums = usingChecksums;
+ try {
+ bf = new RandomAccessFile(blockFile, "rw");
+ bf.seek(1539);
+ bf.write(new byte[] {0,0,0,0,0,0,0,0,0,0,0,0,0,0});
+ } finally {
+ if (bf != null) bf.close();
+ }
}
- assertTrue(sawException);
- assertEquals(0, buf.position());
- assertEquals(buf.capacity(), buf.limit());
-
- dis = cluster.getFileSystem().open(path);
- buf.position(3);
- buf.limit(25);
- sawException = false;
- try {
- dis.read(buf);
- } catch (ChecksumException ex) {
- sawException = true;
+ public void doTest(BlockReaderLocal reader, byte original[])
+ throws IOException {
+ byte buf[] = new byte[TEST_LENGTH];
+ try {
+ reader.readFully(buf, 0, 10);
+ assertArrayRegionsEqual(original, 0, buf, 0, 10);
+ reader.readFully(buf, 10, 100);
+ assertArrayRegionsEqual(original, 10, buf, 10, 100);
+ reader.readFully(buf, 110, 700);
+ assertArrayRegionsEqual(original, 110, buf, 110, 700);
+ reader.skip(1); // skip from offset 810 to offset 811
+ reader.readFully(buf, 811, 5);
+ assertArrayRegionsEqual(original, 811, buf, 811, 5);
+ reader.readFully(buf, 816, 900);
+ if (usingChecksums) {
+ // We should detect the corruption when using a checksum file.
+ Assert.fail("did not detect corruption");
+ }
+ } catch (ChecksumException e) {
+ if (!usingChecksums) {
+ Assert.fail("didn't expect to get ChecksumException: not " +
+ "using checksums.");
+ }
+ }
}
+ }
- assertTrue(sawException);
- assertEquals(3, buf.position());
- assertEquals(25, buf.limit());
+ @Test
+ public void testBlockReaderLocalReadCorrupt()
+ throws IOException {
+ runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), true);
+ runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), false);
}
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java?rev=1467538&r1=1467537&r2=1467538&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java Sat Apr 13 02:13:59 2013
@@ -61,7 +61,7 @@ public class TestClientBlockVerification
util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024));
util.readAndCheckEOS(reader, FILE_SIZE_K * 1024, true);
verify(reader).sendReadResult(Status.CHECKSUM_OK);
- reader.close();
+ reader.close(null, null);
}
/**
@@ -76,7 +76,7 @@ public class TestClientBlockVerification
// We asked the blockreader for the whole file, and only read
// half of it, so no CHECKSUM_OK
verify(reader, never()).sendReadResult(Status.CHECKSUM_OK);
- reader.close();
+ reader.close(null, null);
}
/**
@@ -92,7 +92,7 @@ public class TestClientBlockVerification
// And read half the file
util.readAndCheckEOS(reader, FILE_SIZE_K * 1024 / 2, true);
verify(reader).sendReadResult(Status.CHECKSUM_OK);
- reader.close();
+ reader.close(null, null);
}
/**
@@ -111,7 +111,7 @@ public class TestClientBlockVerification
util.getBlockReader(testBlock, startOffset, length));
util.readAndCheckEOS(reader, length, true);
verify(reader).sendReadResult(Status.CHECKSUM_OK);
- reader.close();
+ reader.close(null, null);
}
}
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java?rev=1467538&r1=1467537&r2=1467538&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java Sat Apr 13 02:13:59 2013
@@ -18,28 +18,20 @@
package org.apache.hadoop.hdfs;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.spy;
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.security.PrivilegedExceptionAction;
+
+import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.security.token.Token;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
@@ -55,59 +47,31 @@ public class TestConnCache {
static final int BLOCK_SIZE = 4096;
static final int FILE_SIZE = 3 * BLOCK_SIZE;
- final static int CACHE_SIZE = 4;
- final static long CACHE_EXPIRY_MS = 200;
- static Configuration conf = null;
- static MiniDFSCluster cluster = null;
- static FileSystem fs = null;
- static SocketCache cache;
-
- static final Path testFile = new Path("/testConnCache.dat");
- static byte authenticData[] = null;
-
- static BlockReaderTestUtil util = null;
-
/**
* A mock Answer to remember the BlockReader used.
*
* It verifies that all invocation to DFSInputStream.getBlockReader()
- * use the same socket.
+ * use the same peer.
*/
private class MockGetBlockReader implements Answer<RemoteBlockReader2> {
public RemoteBlockReader2 reader = null;
- private Socket sock = null;
+ private Peer peer = null;
@Override
public RemoteBlockReader2 answer(InvocationOnMock invocation) throws Throwable {
RemoteBlockReader2 prevReader = reader;
reader = (RemoteBlockReader2) invocation.callRealMethod();
- if (sock == null) {
- sock = reader.dnSock;
+ if (peer == null) {
+ peer = reader.getPeer();
} else if (prevReader != null) {
- assertSame("DFSInputStream should use the same socket",
- sock, reader.dnSock);
+ Assert.assertSame("DFSInputStream should use the same peer",
+ peer, reader.getPeer());
}
return reader;
}
}
- @BeforeClass
- public static void setupCluster() throws Exception {
- final int REPLICATION_FACTOR = 1;
-
- /* create a socket cache. There is only one socket cache per jvm */
- cache = SocketCache.getInstance(CACHE_SIZE, CACHE_EXPIRY_MS);
-
- util = new BlockReaderTestUtil(REPLICATION_FACTOR);
- cluster = util.getCluster();
- conf = util.getConf();
- fs = cluster.getFileSystem();
-
- authenticData = util.writeFile(testFile, FILE_SIZE / 1024);
- }
-
-
/**
* (Optionally) seek to position, read and verify data.
*
@@ -117,9 +81,10 @@ public class TestConnCache {
long pos,
byte[] buffer,
int offset,
- int length)
+ int length,
+ byte[] authenticData)
throws IOException {
- assertTrue("Test buffer too small", buffer.length >= offset + length);
+ Assert.assertTrue("Test buffer too small", buffer.length >= offset + length);
if (pos >= 0)
in.seek(pos);
@@ -129,7 +94,7 @@ public class TestConnCache {
while (length > 0) {
int cnt = in.read(buffer, offset, length);
- assertTrue("Error in read", cnt > 0);
+ Assert.assertTrue("Error in read", cnt > 0);
offset += cnt;
length -= cnt;
}
@@ -145,115 +110,22 @@ public class TestConnCache {
}
/**
- * Test the SocketCache itself.
- */
- @Test
- public void testSocketCache() throws Exception {
- // Make a client
- InetSocketAddress nnAddr =
- new InetSocketAddress("localhost", cluster.getNameNodePort());
- DFSClient client = new DFSClient(nnAddr, conf);
-
- // Find out the DN addr
- LocatedBlock block =
- client.getNamenode().getBlockLocations(
- testFile.toString(), 0, FILE_SIZE)
- .getLocatedBlocks().get(0);
- DataNode dn = util.getDataNode(block);
- InetSocketAddress dnAddr = dn.getXferAddress();
-
-
- // Make some sockets to the DN
- Socket[] dnSockets = new Socket[CACHE_SIZE];
- for (int i = 0; i < dnSockets.length; ++i) {
- dnSockets[i] = client.socketFactory.createSocket(
- dnAddr.getAddress(), dnAddr.getPort());
- }
-
-
- // Insert a socket to the NN
- Socket nnSock = new Socket(nnAddr.getAddress(), nnAddr.getPort());
- cache.put(nnSock, null);
- assertSame("Read the write", nnSock, cache.get(nnAddr).sock);
- cache.put(nnSock, null);
-
- // Insert DN socks
- for (Socket dnSock : dnSockets) {
- cache.put(dnSock, null);
- }
-
- assertEquals("NN socket evicted", null, cache.get(nnAddr));
- assertTrue("Evicted socket closed", nnSock.isClosed());
-
- // Lookup the DN socks
- for (Socket dnSock : dnSockets) {
- assertEquals("Retrieve cached sockets", dnSock, cache.get(dnAddr).sock);
- dnSock.close();
- }
-
- assertEquals("Cache is empty", 0, cache.size());
- }
-
-
- /**
- * Test the SocketCache expiry.
- * Verify that socket cache entries expire after the set
- * expiry time.
- */
- @Test
- public void testSocketCacheExpiry() throws Exception {
- // Make a client
- InetSocketAddress nnAddr =
- new InetSocketAddress("localhost", cluster.getNameNodePort());
- DFSClient client = new DFSClient(nnAddr, conf);
-
- // Find out the DN addr
- LocatedBlock block =
- client.getNamenode().getBlockLocations(
- testFile.toString(), 0, FILE_SIZE)
- .getLocatedBlocks().get(0);
- DataNode dn = util.getDataNode(block);
- InetSocketAddress dnAddr = dn.getXferAddress();
-
-
- // Make some sockets to the DN and put in cache
- Socket[] dnSockets = new Socket[CACHE_SIZE];
- for (int i = 0; i < dnSockets.length; ++i) {
- dnSockets[i] = client.socketFactory.createSocket(
- dnAddr.getAddress(), dnAddr.getPort());
- cache.put(dnSockets[i], null);
- }
-
- // Client side still has the sockets cached
- assertEquals(CACHE_SIZE, client.socketCache.size());
-
- //sleep for a second and see if it expired
- Thread.sleep(CACHE_EXPIRY_MS + 1000);
-
- // Client side has no sockets cached
- assertEquals(0, client.socketCache.size());
-
- //sleep for another second and see if
- //the daemon thread runs fine on empty cache
- Thread.sleep(CACHE_EXPIRY_MS + 1000);
- }
-
-
- /**
* Read a file served entirely from one DN. Seek around and read from
* different offsets. And verify that they all use the same socket.
- *
- * @throws java.io.IOException
+ * @throws Exception
*/
@Test
@SuppressWarnings("unchecked")
- public void testReadFromOneDN() throws IOException {
- LOG.info("Starting testReadFromOneDN()");
+ public void testReadFromOneDN() throws Exception {
+ BlockReaderTestUtil util = new BlockReaderTestUtil(1,
+ new HdfsConfiguration());
+ final Path testFile = new Path("/testConnCache.dat");
+ byte authenticData[] = util.writeFile(testFile, FILE_SIZE / 1024);
DFSClient client = new DFSClient(
- new InetSocketAddress("localhost", cluster.getNameNodePort()), conf);
- DFSInputStream in = spy(client.open(testFile.toString()));
+ new InetSocketAddress("localhost",
+ util.getCluster().getNameNodePort()), util.getConf());
+ DFSInputStream in = Mockito.spy(client.open(testFile.toString()));
LOG.info("opened " + testFile.toString());
-
byte[] dataBuf = new byte[BLOCK_SIZE];
MockGetBlockReader answer = new MockGetBlockReader();
@@ -270,18 +142,15 @@ public class TestConnCache {
Matchers.anyString());
// Initial read
- pread(in, 0, dataBuf, 0, dataBuf.length);
+ pread(in, 0, dataBuf, 0, dataBuf.length, authenticData);
// Read again and verify that the socket is the same
- pread(in, FILE_SIZE - dataBuf.length, dataBuf, 0, dataBuf.length);
- pread(in, 1024, dataBuf, 0, dataBuf.length);
- pread(in, -1, dataBuf, 0, dataBuf.length); // No seek; just read
- pread(in, 64, dataBuf, 0, dataBuf.length / 2);
+ pread(in, FILE_SIZE - dataBuf.length, dataBuf, 0, dataBuf.length,
+ authenticData);
+ pread(in, 1024, dataBuf, 0, dataBuf.length, authenticData);
+ // No seek; just read
+ pread(in, -1, dataBuf, 0, dataBuf.length, authenticData);
+ pread(in, 64, dataBuf, 0, dataBuf.length / 2, authenticData);
in.close();
}
-
- @AfterClass
- public static void teardownCluster() throws Exception {
- util.shutdown();
- }
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java?rev=1467538&r1=1467537&r2=1467538&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java Sat Apr 13 02:13:59 2013
@@ -35,6 +35,7 @@ import org.apache.hadoop.fs.FSDataInputS
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
+import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -70,7 +71,7 @@ public class TestDataTransferKeepalive {
.numDataNodes(1).build();
fs = cluster.getFileSystem();
dfsClient = ((DistributedFileSystem)fs).dfs;
- dfsClient.socketCache.clear();
+ dfsClient.peerCache.clear();
String poolId = cluster.getNamesystem().getBlockPoolId();
dn = cluster.getDataNodes().get(0);
@@ -93,13 +94,13 @@ public class TestDataTransferKeepalive {
DFSTestUtil.createFile(fs, TEST_FILE, 1L, (short)1, 0L);
// Clients that write aren't currently re-used.
- assertEquals(0, dfsClient.socketCache.size());
+ assertEquals(0, dfsClient.peerCache.size());
assertXceiverCount(0);
// Reads the file, so we should get a
// cached socket, and should have an xceiver on the other side.
DFSTestUtil.readFile(fs, TEST_FILE);
- assertEquals(1, dfsClient.socketCache.size());
+ assertEquals(1, dfsClient.peerCache.size());
assertXceiverCount(1);
// Sleep for a bit longer than the keepalive timeout
@@ -110,13 +111,13 @@ public class TestDataTransferKeepalive {
// The socket is still in the cache, because we don't
// notice that it's closed until we try to read
// from it again.
- assertEquals(1, dfsClient.socketCache.size());
+ assertEquals(1, dfsClient.peerCache.size());
// Take it out of the cache - reading should
// give an EOF.
- Socket s = dfsClient.socketCache.get(dnAddr).sock;
- assertNotNull(s);
- assertEquals(-1, NetUtils.getInputStream(s).read());
+ Peer peer = dfsClient.peerCache.get(dn.getDatanodeId(), false);
+ assertNotNull(peer);
+ assertEquals(-1, peer.getInputStream().read());
}
/**
@@ -175,14 +176,14 @@ public class TestDataTransferKeepalive {
}
DFSClient client = ((DistributedFileSystem)fs).dfs;
- assertEquals(5, client.socketCache.size());
+ assertEquals(5, client.peerCache.size());
// Let all the xceivers timeout
Thread.sleep(1500);
assertXceiverCount(0);
// Client side still has the sockets cached
- assertEquals(5, client.socketCache.size());
+ assertEquals(5, client.peerCache.size());
// Reading should not throw an exception.
DFSTestUtil.readFile(fs, TEST_FILE);
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelRead.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelRead.java?rev=1467538&r1=1467537&r2=1467538&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelRead.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelRead.java Sat Apr 13 02:13:59 2013
@@ -17,53 +17,30 @@
*/
package org.apache.hadoop.hdfs;
-import java.io.IOException;
-
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
import org.apache.log4j.Level;
import org.junit.AfterClass;
import org.junit.BeforeClass;
-import org.junit.Test;
public class TestParallelRead extends TestParallelReadUtil {
-
@BeforeClass
static public void setupCluster() throws Exception {
- setupCluster(DEFAULT_REPLICATION_FACTOR, new HdfsConfiguration());
+ // This is a test of the normal (TCP) read path. For this reason, we turn
+ // off both short-circuit local reads and UNIX domain socket data traffic.
+ HdfsConfiguration conf = new HdfsConfiguration();
+ conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, false);
+ conf.setBoolean(DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC,
+ false);
+ // dfs.domain.socket.path should be ignored because the previous two keys
+ // were set to false. This is a regression test for HDFS-4473.
+ conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY, "/will/not/be/created");
+
+ setupCluster(DEFAULT_REPLICATION_FACTOR, conf);
}
@AfterClass
static public void teardownCluster() throws Exception {
TestParallelReadUtil.teardownCluster();
}
-
- /**
- * Do parallel read several times with different number of files and threads.
- *
- * Note that while this is the only "test" in a junit sense, we're actually
- * dispatching a lot more. Failures in the other methods (and other threads)
- * need to be manually collected, which is inconvenient.
- */
- @Test
- public void testParallelReadCopying() throws IOException {
- runTestWorkload(new CopyingReadWorkerHelper());
- }
-
- @Test
- public void testParallelReadByteBuffer() throws IOException {
- runTestWorkload(new DirectReadWorkerHelper());
- }
-
- @Test
- public void testParallelReadMixed() throws IOException {
- runTestWorkload(new MixedWorkloadHelper());
- }
-
- @Test
- public void testParallelNoChecksums() throws IOException {
- verifyChecksums = false;
- runTestWorkload(new MixedWorkloadHelper());
- }
-
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelReadUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelReadUtil.java?rev=1467538&r1=1467537&r2=1467538&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelReadUtil.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelReadUtil.java Sat Apr 13 02:13:59 2013
@@ -32,12 +32,18 @@ import org.apache.hadoop.hdfs.server.dat
import org.apache.hadoop.util.Time;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
+import org.junit.Assume;
+import org.junit.Ignore;
+import org.junit.Test;
/**
* Driver class for testing the use of DFSInputStream by multiple concurrent
- * readers, using the different read APIs. See subclasses for the actual test
- * cases.
+ * readers, using the different read APIs.
+ *
+ * This class is marked as @Ignore so that junit doesn't try to execute the
+ * tests in here directly. They are executed from subclasses.
*/
+@Ignore
public class TestParallelReadUtil {
static final Log LOG = LogFactory.getLog(TestParallelReadUtil.class);
@@ -388,4 +394,31 @@ public class TestParallelReadUtil {
util.shutdown();
}
+ /**
+ * Do parallel read several times with different number of files and threads.
+ *
+ * Note that while this is the only "test" in a junit sense, we're actually
+ * dispatching a lot more. Failures in the other methods (and other threads)
+ * need to be manually collected, which is inconvenient.
+ */
+ @Test
+ public void testParallelReadCopying() throws IOException {
+ runTestWorkload(new CopyingReadWorkerHelper());
+ }
+
+ @Test
+ public void testParallelReadByteBuffer() throws IOException {
+ runTestWorkload(new DirectReadWorkerHelper());
+ }
+
+ @Test
+ public void testParallelReadMixed() throws IOException {
+ runTestWorkload(new MixedWorkloadHelper());
+ }
+
+ @Test
+ public void testParallelNoChecksums() throws IOException {
+ verifyChecksums = false;
+ runTestWorkload(new MixedWorkloadHelper());
+ }
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java?rev=1467538&r1=1467537&r2=1467538&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java Sat Apr 13 02:13:59 2013
@@ -21,10 +21,13 @@ import static org.junit.Assert.assertTru
import static org.junit.Assert.assertFalse;
import java.io.EOFException;
+import java.io.File;
import java.io.IOException;
+import java.io.RandomAccessFile;
import java.net.URI;
import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction;
+import java.util.concurrent.TimeoutException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
@@ -32,7 +35,6 @@ import org.apache.hadoop.fs.FSDataOutput
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
-import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -40,15 +42,21 @@ import org.apache.hadoop.hdfs.protocol.E
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.server.datanode.DataNode;
-import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.net.unix.DomainSocket;
+import org.apache.hadoop.net.unix.TemporarySocketDirectory;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
+import org.junit.AfterClass;
import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
+import static org.hamcrest.CoreMatchers.*;
/**
* Test for short circuit read functionality using {@link BlockReaderLocal}.
@@ -58,9 +66,24 @@ import org.junit.Test;
* system.
*/
public class TestShortCircuitLocalRead {
+ private static TemporarySocketDirectory sockDir;
- static final String DIR = "/" + TestShortCircuitLocalRead.class.getSimpleName() + "/";
+ @BeforeClass
+ public static void init() {
+ sockDir = new TemporarySocketDirectory();
+ DomainSocket.disableBindPathValidation();
+ }
+
+ @AfterClass
+ public static void shutdown() throws IOException {
+ sockDir.close();
+ }
+ @Before
+ public void before() {
+ Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null));
+ }
+
static final long seed = 0xDEADBEEFL;
static final int blockSize = 5120;
boolean simulatedStorage = false;
@@ -84,7 +107,9 @@ public class TestShortCircuitLocalRead {
for (int idx = 0; idx < len; idx++) {
if (expected[from + idx] != actual[idx]) {
Assert.fail(message + " byte " + (from + idx) + " differs. expected "
- + expected[from + idx] + " actual " + actual[idx]);
+ + expected[from + idx] + " actual " + actual[idx] +
+ "\nexpected: " + StringUtils.byteToHexString(expected, from, from + len) +
+ "\nactual: " + StringUtils.byteToHexString(actual, 0, len));
}
}
}
@@ -96,11 +121,13 @@ public class TestShortCircuitLocalRead {
/** Check file content, reading as user {@code readingUser} */
static void checkFileContent(URI uri, Path name, byte[] expected,
int readOffset, String readingUser, Configuration conf,
- boolean shortCircuitFails)
+ boolean legacyShortCircuitFails)
throws IOException, InterruptedException {
// Ensure short circuit is enabled
DistributedFileSystem fs = getFileSystem(readingUser, uri, conf);
- assertTrue(fs.getClient().getShortCircuitLocalReads());
+ if (legacyShortCircuitFails) {
+ assertTrue(fs.getClient().useLegacyBlockReaderLocal());
+ }
FSDataInputStream stm = fs.open(name);
byte[] actual = new byte[expected.length-readOffset];
@@ -127,9 +154,8 @@ public class TestShortCircuitLocalRead {
}
checkData(actual, readOffset, expected, "Read 3");
- if (shortCircuitFails) {
- // short circuit should be disabled due to failure
- assertFalse(fs.getClient().getShortCircuitLocalReads());
+ if (legacyShortCircuitFails) {
+ assertFalse(fs.getClient().useLegacyBlockReaderLocal());
}
stm.close();
}
@@ -145,11 +171,13 @@ public class TestShortCircuitLocalRead {
/** Check the file content, reading as user {@code readingUser} */
static void checkFileContentDirect(URI uri, Path name, byte[] expected,
int readOffset, String readingUser, Configuration conf,
- boolean shortCircuitFails)
+ boolean legacyShortCircuitFails)
throws IOException, InterruptedException {
// Ensure short circuit is enabled
DistributedFileSystem fs = getFileSystem(readingUser, uri, conf);
- assertTrue(fs.getClient().getShortCircuitLocalReads());
+ if (legacyShortCircuitFails) {
+ assertTrue(fs.getClient().useLegacyBlockReaderLocal());
+ }
HdfsDataInputStream stm = (HdfsDataInputStream)fs.open(name);
@@ -180,33 +208,45 @@ public class TestShortCircuitLocalRead {
nread += nbytes;
}
checkData(arrayFromByteBuffer(actual), readOffset, expected, "Read 3");
- if (shortCircuitFails) {
- // short circuit should be disabled due to failure
- assertFalse(fs.getClient().getShortCircuitLocalReads());
+ if (legacyShortCircuitFails) {
+ assertFalse(fs.getClient().useLegacyBlockReaderLocal());
}
stm.close();
}
+ public void doTestShortCircuitReadLegacy(boolean ignoreChecksum, int size,
+ int readOffset, String shortCircuitUser, String readingUser,
+ boolean legacyShortCircuitFails) throws IOException, InterruptedException {
+ doTestShortCircuitReadImpl(ignoreChecksum, size, readOffset,
+ shortCircuitUser, readingUser, legacyShortCircuitFails);
+ }
+
public void doTestShortCircuitRead(boolean ignoreChecksum, int size,
int readOffset) throws IOException, InterruptedException {
String shortCircuitUser = getCurrentUser();
- doTestShortCircuitRead(ignoreChecksum, size, readOffset, shortCircuitUser,
- shortCircuitUser, false);
+ doTestShortCircuitReadImpl(ignoreChecksum, size, readOffset,
+ null, getCurrentUser(), false);
}
/**
* Test that file data can be read by reading the block file
* directly from the local store.
*/
- public void doTestShortCircuitRead(boolean ignoreChecksum, int size,
+ public void doTestShortCircuitReadImpl(boolean ignoreChecksum, int size,
int readOffset, String shortCircuitUser, String readingUser,
- boolean shortCircuitFails) throws IOException, InterruptedException {
+ boolean legacyShortCircuitFails) throws IOException, InterruptedException {
Configuration conf = new Configuration();
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
ignoreChecksum);
- conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
- shortCircuitUser);
+ conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
+ new File(sockDir.getDir(),
+ "TestShortCircuitLocalRead._PORT.sock").getAbsolutePath());
+ if (shortCircuitUser != null) {
+ conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
+ shortCircuitUser);
+ conf.setBoolean(DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL, true);
+ }
if (simulatedStorage) {
SimulatedFSDataset.setFactory(conf);
}
@@ -228,9 +268,9 @@ public class TestShortCircuitLocalRead {
URI uri = cluster.getURI();
checkFileContent(uri, file1, fileData, readOffset, readingUser, conf,
- shortCircuitFails);
+ legacyShortCircuitFails);
checkFileContentDirect(uri, file1, fileData, readOffset, readingUser,
- conf, shortCircuitFails);
+ conf, legacyShortCircuitFails);
} finally {
fs.close();
cluster.shutdown();
@@ -255,6 +295,12 @@ public class TestShortCircuitLocalRead {
doTestShortCircuitRead(true, 13, 5);
}
+ @Test(timeout=10000)
+ public void testLocalReadLegacy() throws Exception {
+ doTestShortCircuitReadLegacy(true, 13, 0, getCurrentUser(),
+ getCurrentUser(), false);
+ }
+
/**
* Try a short circuit from a reader that is not allowed to
* to use short circuit. The test ensures reader falls back to non
@@ -262,7 +308,7 @@ public class TestShortCircuitLocalRead {
*/
@Test(timeout=10000)
public void testLocalReadFallback() throws Exception {
- doTestShortCircuitRead(true, 13, 0, getCurrentUser(), "notallowed", true);
+ doTestShortCircuitReadLegacy(true, 13, 0, getCurrentUser(), "notallowed", true);
}
@Test(timeout=10000)
@@ -276,7 +322,7 @@ public class TestShortCircuitLocalRead {
doTestShortCircuitRead(false, 10*blockSize+100, 777);
doTestShortCircuitRead(true, 10*blockSize+100, 777);
}
-
+
private ClientDatanodeProtocol getProxy(UserGroupInformation ugi,
final DatanodeID dnInfo, final Configuration conf) throws IOException,
InterruptedException {
@@ -301,21 +347,15 @@ public class TestShortCircuitLocalRead {
}
@Test(timeout=10000)
- public void testGetBlockLocalPathInfo() throws IOException, InterruptedException {
+ public void testDeprecatedGetBlockLocalPathInfoRpc()
+ throws IOException, InterruptedException {
final Configuration conf = new Configuration();
- conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
- "alloweduser1,alloweduser2");
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)
.format(true).build();
cluster.waitActive();
- final DataNode dn = cluster.getDataNodes().get(0);
FileSystem fs = cluster.getFileSystem();
try {
DFSTestUtil.createFile(fs, new Path("/tmp/x"), 16, (short) 1, 23);
- UserGroupInformation aUgi1 =
- UserGroupInformation.createRemoteUser("alloweduser1");
- UserGroupInformation aUgi2 =
- UserGroupInformation.createRemoteUser("alloweduser2");
LocatedBlocks lb = cluster.getNameNode().getRpcServer()
.getBlockLocations("/tmp/x", 0, 16);
// Create a new block object, because the block inside LocatedBlock at
@@ -323,29 +363,11 @@ public class TestShortCircuitLocalRead {
ExtendedBlock blk = new ExtendedBlock(lb.get(0).getBlock());
Token<BlockTokenIdentifier> token = lb.get(0).getBlockToken();
final DatanodeInfo dnInfo = lb.get(0).getLocations()[0];
- ClientDatanodeProtocol proxy = getProxy(aUgi1, dnInfo, conf);
- // This should succeed
- BlockLocalPathInfo blpi = proxy.getBlockLocalPathInfo(blk, token);
- Assert.assertEquals(
- DataNodeTestUtils.getFSDataset(dn).getBlockLocalPathInfo(blk).getBlockPath(),
- blpi.getBlockPath());
-
- // Try with the other allowed user
- proxy = getProxy(aUgi2, dnInfo, conf);
-
- // This should succeed as well
- blpi = proxy.getBlockLocalPathInfo(blk, token);
- Assert.assertEquals(
- DataNodeTestUtils.getFSDataset(dn).getBlockLocalPathInfo(blk).getBlockPath(),
- blpi.getBlockPath());
-
- // Now try with a disallowed user
- UserGroupInformation bUgi = UserGroupInformation
- .createRemoteUser("notalloweduser");
- proxy = getProxy(bUgi, dnInfo, conf);
+ ClientDatanodeProtocol proxy =
+ DFSUtil.createClientDatanodeProtocolProxy(dnInfo, conf, 60000, false);
try {
proxy.getBlockLocalPathInfo(blk, token);
- Assert.fail("The call should have failed as " + bUgi.getShortUserName()
+ Assert.fail("The call should have failed as this user "
+ " is not allowed to call getBlockLocalPathInfo");
} catch (IOException ex) {
Assert.assertTrue(ex.getMessage().contains(
@@ -363,8 +385,9 @@ public class TestShortCircuitLocalRead {
Configuration conf = new Configuration();
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, false);
- conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
- getCurrentUser());
+ conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
+ "/tmp/testSkipWithVerifyChecksum._PORT");
+ DomainSocket.disableBindPathValidation();
if (simulatedStorage) {
SimulatedFSDataset.setFactory(conf);
}
@@ -402,6 +425,88 @@ public class TestShortCircuitLocalRead {
cluster.shutdown();
}
}
+
+ @Test
+ public void testHandleTruncatedBlockFile() throws IOException {
+ MiniDFSCluster cluster = null;
+ HdfsConfiguration conf = new HdfsConfiguration();
+ conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
+ conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, false);
+ conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
+ "/tmp/testHandleTruncatedBlockFile._PORT");
+ conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, "CRC32C");
+ final Path TEST_PATH = new Path("/a");
+ final Path TEST_PATH2 = new Path("/b");
+ final long RANDOM_SEED = 4567L;
+ final long RANDOM_SEED2 = 4568L;
+ FSDataInputStream fsIn = null;
+ final int TEST_LENGTH = 3456;
+
+ try {
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+ cluster.waitActive();
+ FileSystem fs = cluster.getFileSystem();
+ DFSTestUtil.createFile(fs, TEST_PATH,
+ TEST_LENGTH, (short)1, RANDOM_SEED);
+ DFSTestUtil.createFile(fs, TEST_PATH2,
+ TEST_LENGTH, (short)1, RANDOM_SEED2);
+ fsIn = cluster.getFileSystem().open(TEST_PATH2);
+ byte original[] = new byte[TEST_LENGTH];
+ IOUtils.readFully(fsIn, original, 0, TEST_LENGTH);
+ fsIn.close();
+ fsIn = null;
+ try {
+ DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
+ } catch (InterruptedException e) {
+ Assert.fail("unexpected InterruptedException during " +
+ "waitReplication: " + e);
+ } catch (TimeoutException e) {
+ Assert.fail("unexpected TimeoutException during " +
+ "waitReplication: " + e);
+ }
+ ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, TEST_PATH);
+ File dataFile = MiniDFSCluster.getBlockFile(0, block);
+ cluster.shutdown();
+ cluster = null;
+ RandomAccessFile raf = null;
+ try {
+ raf = new RandomAccessFile(dataFile, "rw");
+ raf.setLength(0);
+ } finally {
+ if (raf != null) raf.close();
+ }
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).format(false).build();
+ cluster.waitActive();
+ fs = cluster.getFileSystem();
+ fsIn = fs.open(TEST_PATH);
+ try {
+ byte buf[] = new byte[100];
+ fsIn.seek(2000);
+ fsIn.readFully(buf, 0, buf.length);
+ Assert.fail("shouldn't be able to read from corrupt 0-length " +
+ "block file.");
+ } catch (IOException e) {
+ DFSClient.LOG.error("caught exception ", e);
+ }
+ fsIn.close();
+ fsIn = null;
+
+ // We should still be able to read the other file.
+ // This is important because it indicates that we detected that the
+ // previous block was corrupt, rather than blaming the problem on
+ // communication.
+ fsIn = fs.open(TEST_PATH2);
+ byte buf[] = new byte[original.length];
+ fsIn.readFully(buf, 0, buf.length);
+ TestBlockReaderLocal.assertArrayRegionsEqual(original, 0, buf, 0,
+ original.length);
+ fsIn.close();
+ fsIn = null;
+ } finally {
+ if (fsIn != null) fsIn.close();
+ if (cluster != null) cluster.shutdown();
+ }
+ }
/**
* Test to run benchmarks between short circuit read vs regular read with
@@ -424,6 +529,8 @@ public class TestShortCircuitLocalRead {
// Setup create a file
final Configuration conf = new Configuration();
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, shortcircuit);
+ conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
+ "/tmp/TestShortCircuitLocalRead._PORT");
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
checksum);
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java?rev=1467538&r1=1467537&r2=1467538&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java Sat Apr 13 02:13:59 2013
@@ -42,6 +42,7 @@ import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.net.TcpPeerServer;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -145,8 +146,9 @@ public class TestBlockTokenWithDFS {
String file = BlockReaderFactory.getFileName(targetAddr,
"test-blockpoolid", block.getBlockId());
blockReader = BlockReaderFactory.newBlockReader(
- conf, s, file, block,
- lblock.getBlockToken(), 0, -1, null);
+ conf, file, block, lblock.getBlockToken(), 0, -1,
+ true, "TestBlockTokenWithDFS", TcpPeerServer.peerFromSocket(s),
+ nodes[0], null, false);
} catch (IOException ex) {
if (ex instanceof InvalidBlockTokenException) {
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java?rev=1467538&r1=1467537&r2=1467538&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java Sat Apr 13 02:13:59 2013
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.datanode;
import java.io.File;
+import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -962,6 +963,12 @@ public class SimulatedFSDataset implemen
public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock b) {
throw new UnsupportedOperationException();
}
+
+ @Override
+ public FileInputStream[] getShortCircuitFdsForRead(ExtendedBlock block)
+ throws IOException {
+ throw new UnsupportedOperationException();
+ }
@Override
public HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks)
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java?rev=1467538&r1=1467537&r2=1467538&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java Sat Apr 13 02:13:59 2013
@@ -32,11 +32,13 @@ import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.BlockReader;
import org.apache.hadoop.hdfs.BlockReaderFactory;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.net.TcpPeerServer;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -280,10 +282,11 @@ public class TestDataNodeVolumeFailure {
String file = BlockReaderFactory.getFileName(targetAddr,
"test-blockpoolid",
block.getBlockId());
- BlockReaderFactory.newBlockReader(conf, s, file, block, lblock
- .getBlockToken(), 0, -1, null);
-
- // nothing - if it fails - it will throw and exception
+ BlockReader blockReader =
+ BlockReaderFactory.newBlockReader(conf, file, block,
+ lblock.getBlockToken(), 0, -1, true, "TestDataNodeVolumeFailure",
+ TcpPeerServer.peerFromSocket(s), datanode, null, false);
+ blockReader.close(null, null);
}
/**