You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ar...@apache.org on 2013/12/19 03:04:10 UTC
svn commit: r1552205 [5/6] - in
/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project:
hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/ hadoop-hdfs/
hadoop-hdfs/dev-support/ hadoop-hdfs/src/main/java/
hadoop-hdfs/src/main/java/org/apache/...
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java?rev=1552205&r1=1552204&r2=1552205&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java Thu Dec 19 02:03:47 2013
@@ -2143,17 +2143,14 @@ public class MiniDFSCluster {
}
/**
- * Get a storage directory for a datanode. There are two storage directories
- * per datanode:
+ * Get a storage directory for a datanode.
* <ol>
* <li><base directory>/data/data<2*dnIndex + 1></li>
* <li><base directory>/data/data<2*dnIndex + 2></li>
* </ol>
*
* @param dnIndex datanode index (starts from 0)
- * @param dirIndex directory index (0 or 1). Index 0 provides access to the
- * first storage directory. Index 1 provides access to the second
- * storage directory.
+ * @param dirIndex directory index.
* @return Storage directory
*/
public static File getStorageDir(int dnIndex, int dirIndex) {
@@ -2164,7 +2161,7 @@ public class MiniDFSCluster {
* Calculate the DN instance-specific path for appending to the base dir
* to determine the location of the storage of a DN instance in the mini cluster
* @param dnIndex datanode index
- * @param dirIndex directory index (0 or 1).
+ * @param dirIndex directory index.
* @return
*/
private static String getStorageDirPath(int dnIndex, int dirIndex) {
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java?rev=1552205&r1=1552204&r2=1552205&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java Thu Dec 19 02:03:47 2013
@@ -32,6 +32,8 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.net.unix.TemporarySocketDirectory;
@@ -92,22 +94,35 @@ public class TestBlockReaderLocal {
}
}
- private static interface BlockReaderLocalTest {
- final int TEST_LENGTH = 12345;
+ private static class BlockReaderLocalTest {
+ final static int TEST_LENGTH = 12345;
+ final static int BYTES_PER_CHECKSUM = 512;
+
+ public void setConfiguration(HdfsConfiguration conf) {
+ // default: no-op
+ }
public void setup(File blockFile, boolean usingChecksums)
- throws IOException;
+ throws IOException {
+ // default: no-op
+ }
public void doTest(BlockReaderLocal reader, byte original[])
- throws IOException;
+ throws IOException {
+ // default: no-op
+ }
}
public void runBlockReaderLocalTest(BlockReaderLocalTest test,
- boolean checksum) throws IOException {
+ boolean checksum, long readahead) throws IOException {
MiniDFSCluster cluster = null;
HdfsConfiguration conf = new HdfsConfiguration();
conf.setBoolean(DFSConfigKeys.
DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, !checksum);
+ conf.setLong(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY,
+ BlockReaderLocalTest.BYTES_PER_CHECKSUM);
conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, "CRC32C");
- FileInputStream dataIn = null, checkIn = null;
+ conf.setLong(DFSConfigKeys.DFS_CLIENT_CACHE_READAHEAD, readahead);
+ test.setConfiguration(conf);
+ FileInputStream dataIn = null, metaIn = null;
final Path TEST_PATH = new Path("/a");
final long RANDOM_SEED = 4567L;
BlockReaderLocal blockReaderLocal = null;
@@ -143,45 +158,51 @@ public class TestBlockReaderLocal {
cluster.shutdown();
cluster = null;
test.setup(dataFile, checksum);
- dataIn = new FileInputStream(dataFile);
- checkIn = new FileInputStream(metaFile);
- blockReaderLocal = new BlockReaderLocal(new DFSClient.Conf(conf),
- TEST_PATH.getName(), block, 0, -1,
- dataIn, checkIn, datanodeID, checksum, null);
+ FileInputStream streams[] = {
+ new FileInputStream(dataFile),
+ new FileInputStream(metaFile)
+ };
+ dataIn = streams[0];
+ metaIn = streams[1];
+ blockReaderLocal = new BlockReaderLocal.Builder(
+ new DFSClient.Conf(conf)).
+ setFilename(TEST_PATH.getName()).
+ setBlock(block).
+ setStreams(streams).
+ setDatanodeID(datanodeID).
+ setCachingStrategy(new CachingStrategy(false, readahead)).
+ setVerifyChecksum(checksum).
+ setBlockMetadataHeader(BlockMetadataHeader.preadHeader(
+ metaIn.getChannel())).
+ build();
dataIn = null;
- checkIn = null;
+ metaIn = null;
test.doTest(blockReaderLocal, original);
+ // BlockReaderLocal should not alter the file position.
+ Assert.assertEquals(0, streams[0].getChannel().position());
+ Assert.assertEquals(0, streams[1].getChannel().position());
} finally {
if (fsIn != null) fsIn.close();
if (fs != null) fs.close();
if (cluster != null) cluster.shutdown();
if (dataIn != null) dataIn.close();
- if (checkIn != null) checkIn.close();
+ if (metaIn != null) metaIn.close();
if (blockReaderLocal != null) blockReaderLocal.close();
}
}
private static class TestBlockReaderLocalImmediateClose
- implements BlockReaderLocalTest {
- @Override
- public void setup(File blockFile, boolean usingChecksums)
- throws IOException { }
- @Override
- public void doTest(BlockReaderLocal reader, byte original[])
- throws IOException { }
+ extends BlockReaderLocalTest {
}
@Test
public void testBlockReaderLocalImmediateClose() throws IOException {
- runBlockReaderLocalTest(new TestBlockReaderLocalImmediateClose(), true);
- runBlockReaderLocalTest(new TestBlockReaderLocalImmediateClose(), false);
+ runBlockReaderLocalTest(new TestBlockReaderLocalImmediateClose(), true, 0);
+ runBlockReaderLocalTest(new TestBlockReaderLocalImmediateClose(), false, 0);
}
private static class TestBlockReaderSimpleReads
- implements BlockReaderLocalTest {
- @Override
- public void setup(File blockFile, boolean usingChecksums)
- throws IOException { }
+ extends BlockReaderLocalTest {
@Override
public void doTest(BlockReaderLocal reader, byte original[])
throws IOException {
@@ -194,24 +215,43 @@ public class TestBlockReaderLocal {
assertArrayRegionsEqual(original, 1024, buf, 1024, 513);
reader.readFully(buf, 1537, 514);
assertArrayRegionsEqual(original, 1537, buf, 1537, 514);
+ // Readahead is always at least the size of one chunk in this test.
+ Assert.assertTrue(reader.getMaxReadaheadLength() >=
+ BlockReaderLocalTest.BYTES_PER_CHECKSUM);
}
}
@Test
public void testBlockReaderSimpleReads() throws IOException {
- runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), true);
+ runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), true,
+ DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
+ }
+
+ @Test
+ public void testBlockReaderSimpleReadsShortReadahead() throws IOException {
+ runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), true,
+ BlockReaderLocalTest.BYTES_PER_CHECKSUM - 1);
}
@Test
public void testBlockReaderSimpleReadsNoChecksum() throws IOException {
- runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), false);
+ runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), false,
+ DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
+ }
+
+ @Test
+ public void testBlockReaderSimpleReadsNoReadahead() throws IOException {
+ runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), true, 0);
+ }
+
+ @Test
+ public void testBlockReaderSimpleReadsNoChecksumNoReadahead()
+ throws IOException {
+ runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), false, 0);
}
private static class TestBlockReaderLocalArrayReads2
- implements BlockReaderLocalTest {
- @Override
- public void setup(File blockFile, boolean usingChecksums)
- throws IOException { }
+ extends BlockReaderLocalTest {
@Override
public void doTest(BlockReaderLocal reader, byte original[])
throws IOException {
@@ -234,21 +274,30 @@ public class TestBlockReaderLocal {
@Test
public void testBlockReaderLocalArrayReads2() throws IOException {
runBlockReaderLocalTest(new TestBlockReaderLocalArrayReads2(),
- true);
+ true, DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
}
@Test
public void testBlockReaderLocalArrayReads2NoChecksum()
throws IOException {
runBlockReaderLocalTest(new TestBlockReaderLocalArrayReads2(),
- false);
+ false, DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
+ }
+
+ @Test
+ public void testBlockReaderLocalArrayReads2NoReadahead()
+ throws IOException {
+ runBlockReaderLocalTest(new TestBlockReaderLocalArrayReads2(), true, 0);
+ }
+
+ @Test
+ public void testBlockReaderLocalArrayReads2NoChecksumNoReadahead()
+ throws IOException {
+ runBlockReaderLocalTest(new TestBlockReaderLocalArrayReads2(), false, 0);
}
private static class TestBlockReaderLocalByteBufferReads
- implements BlockReaderLocalTest {
- @Override
- public void setup(File blockFile, boolean usingChecksums)
- throws IOException { }
+ extends BlockReaderLocalTest {
@Override
public void doTest(BlockReaderLocal reader, byte original[])
throws IOException {
@@ -268,19 +317,105 @@ public class TestBlockReaderLocal {
@Test
public void testBlockReaderLocalByteBufferReads()
throws IOException {
- runBlockReaderLocalTest(
- new TestBlockReaderLocalByteBufferReads(), true);
+ runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferReads(),
+ true, DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
}
@Test
public void testBlockReaderLocalByteBufferReadsNoChecksum()
throws IOException {
runBlockReaderLocalTest(
- new TestBlockReaderLocalByteBufferReads(), false);
+ new TestBlockReaderLocalByteBufferReads(),
+ false, DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
+ }
+
+ @Test
+ public void testBlockReaderLocalByteBufferReadsNoReadahead()
+ throws IOException {
+ runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferReads(),
+ true, 0);
+ }
+
+ @Test
+ public void testBlockReaderLocalByteBufferReadsNoChecksumNoReadahead()
+ throws IOException {
+ runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferReads(),
+ false, 0);
+ }
+
+ /**
+ * Test reads that bypass the bounce buffer (because they are aligned
+ * and bigger than the readahead).
+ */
+ private static class TestBlockReaderLocalByteBufferFastLaneReads
+ extends BlockReaderLocalTest {
+ @Override
+ public void doTest(BlockReaderLocal reader, byte original[])
+ throws IOException {
+ ByteBuffer buf = ByteBuffer.allocateDirect(TEST_LENGTH);
+ readFully(reader, buf, 0, 5120);
+ buf.flip();
+ assertArrayRegionsEqual(original, 0,
+ DFSTestUtil.asArray(buf), 0,
+ 5120);
+ reader.skip(1537);
+ readFully(reader, buf, 0, 1);
+ buf.flip();
+ assertArrayRegionsEqual(original, 6657,
+ DFSTestUtil.asArray(buf), 0,
+ 1);
+ reader.setMlocked(true);
+ readFully(reader, buf, 0, 5120);
+ buf.flip();
+ assertArrayRegionsEqual(original, 6658,
+ DFSTestUtil.asArray(buf), 0,
+ 5120);
+ reader.setMlocked(false);
+ readFully(reader, buf, 0, 513);
+ buf.flip();
+ assertArrayRegionsEqual(original, 11778,
+ DFSTestUtil.asArray(buf), 0,
+ 513);
+ reader.skip(3);
+ readFully(reader, buf, 0, 50);
+ buf.flip();
+ assertArrayRegionsEqual(original, 12294,
+ DFSTestUtil.asArray(buf), 0,
+ 50);
+ }
}
+ @Test
+ public void testBlockReaderLocalByteBufferFastLaneReads()
+ throws IOException {
+ runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferFastLaneReads(),
+ true, 2 * BlockReaderLocalTest.BYTES_PER_CHECKSUM);
+ }
+
+ @Test
+ public void testBlockReaderLocalByteBufferFastLaneReadsNoChecksum()
+ throws IOException {
+ runBlockReaderLocalTest(
+ new TestBlockReaderLocalByteBufferFastLaneReads(),
+ false, 2 * BlockReaderLocalTest.BYTES_PER_CHECKSUM);
+ }
+
+ @Test
+ public void testBlockReaderLocalByteBufferFastLaneReadsNoReadahead()
+ throws IOException {
+ runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferFastLaneReads(),
+ true, 0);
+ }
+
+ @Test
+ public void testBlockReaderLocalByteBufferFastLaneReadsNoChecksumNoReadahead()
+ throws IOException {
+ runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferFastLaneReads(),
+ false, 0);
+ }
+
private static class TestBlockReaderLocalReadCorruptStart
- implements BlockReaderLocalTest {
+ extends BlockReaderLocalTest {
boolean usingChecksums = false;
@Override
public void setup(File blockFile, boolean usingChecksums)
@@ -314,11 +449,12 @@ public class TestBlockReaderLocal {
@Test
public void testBlockReaderLocalReadCorruptStart()
throws IOException {
- runBlockReaderLocalTest(new TestBlockReaderLocalReadCorruptStart(), true);
+ runBlockReaderLocalTest(new TestBlockReaderLocalReadCorruptStart(), true,
+ DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
}
private static class TestBlockReaderLocalReadCorrupt
- implements BlockReaderLocalTest {
+ extends BlockReaderLocalTest {
boolean usingChecksums = false;
@Override
public void setup(File blockFile, boolean usingChecksums)
@@ -364,8 +500,136 @@ public class TestBlockReaderLocal {
@Test
public void testBlockReaderLocalReadCorrupt()
throws IOException {
- runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), true);
- runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), false);
+ runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), true,
+ DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
+ }
+
+ @Test
+ public void testBlockReaderLocalReadCorruptNoChecksum()
+ throws IOException {
+ runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), false,
+ DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
+ }
+
+ @Test
+ public void testBlockReaderLocalReadCorruptNoReadahead()
+ throws IOException {
+ runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), true, 0);
+ }
+
+ @Test
+ public void testBlockReaderLocalReadCorruptNoChecksumNoReadahead()
+ throws IOException {
+ runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), false, 0);
+ }
+
+ private static class TestBlockReaderLocalWithMlockChanges
+ extends BlockReaderLocalTest {
+ @Override
+ public void setup(File blockFile, boolean usingChecksums)
+ throws IOException {
+ }
+
+ @Override
+ public void doTest(BlockReaderLocal reader, byte original[])
+ throws IOException {
+ ByteBuffer buf = ByteBuffer.wrap(new byte[TEST_LENGTH]);
+ reader.skip(1);
+ readFully(reader, buf, 1, 9);
+ assertArrayRegionsEqual(original, 1, buf.array(), 1, 9);
+ readFully(reader, buf, 10, 100);
+ assertArrayRegionsEqual(original, 10, buf.array(), 10, 100);
+ reader.setMlocked(true);
+ readFully(reader, buf, 110, 700);
+ assertArrayRegionsEqual(original, 110, buf.array(), 110, 700);
+ reader.setMlocked(false);
+ reader.skip(1); // skip from offset 810 to offset 811
+ readFully(reader, buf, 811, 5);
+ assertArrayRegionsEqual(original, 811, buf.array(), 811, 5);
+ }
+ }
+
+ @Test
+ public void testBlockReaderLocalWithMlockChanges()
+ throws IOException {
+ runBlockReaderLocalTest(new TestBlockReaderLocalWithMlockChanges(),
+ true, DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
+ }
+
+ @Test
+ public void testBlockReaderLocalWithMlockChangesNoChecksum()
+ throws IOException {
+ runBlockReaderLocalTest(new TestBlockReaderLocalWithMlockChanges(),
+ false, DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
+ }
+
+ @Test
+ public void testBlockReaderLocalWithMlockChangesNoReadahead()
+ throws IOException {
+ runBlockReaderLocalTest(new TestBlockReaderLocalWithMlockChanges(),
+ true, 0);
+ }
+
+ @Test
+ public void testBlockReaderLocalWithMlockChangesNoChecksumNoReadahead()
+ throws IOException {
+ runBlockReaderLocalTest(new TestBlockReaderLocalWithMlockChanges(),
+ false, 0);
+ }
+
+ private static class TestBlockReaderLocalOnFileWithoutChecksum
+ extends BlockReaderLocalTest {
+ @Override
+ public void setConfiguration(HdfsConfiguration conf) {
+ conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, "NULL");
+ }
+
+ @Override
+ public void doTest(BlockReaderLocal reader, byte original[])
+ throws IOException {
+ Assert.assertTrue(!reader.getVerifyChecksum());
+ ByteBuffer buf = ByteBuffer.wrap(new byte[TEST_LENGTH]);
+ reader.skip(1);
+ readFully(reader, buf, 1, 9);
+ assertArrayRegionsEqual(original, 1, buf.array(), 1, 9);
+ readFully(reader, buf, 10, 100);
+ assertArrayRegionsEqual(original, 10, buf.array(), 10, 100);
+ reader.setMlocked(true);
+ readFully(reader, buf, 110, 700);
+ assertArrayRegionsEqual(original, 110, buf.array(), 110, 700);
+ reader.setMlocked(false);
+ reader.skip(1); // skip from offset 810 to offset 811
+ readFully(reader, buf, 811, 5);
+ assertArrayRegionsEqual(original, 811, buf.array(), 811, 5);
+ }
+ }
+
+ @Test
+ public void testBlockReaderLocalOnFileWithoutChecksum()
+ throws IOException {
+ runBlockReaderLocalTest(new TestBlockReaderLocalOnFileWithoutChecksum(),
+ true, DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
+ }
+
+ @Test
+ public void testBlockReaderLocalOnFileWithoutChecksumNoChecksum()
+ throws IOException {
+ runBlockReaderLocalTest(new TestBlockReaderLocalOnFileWithoutChecksum(),
+ false, DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
+ }
+
+ @Test
+ public void testBlockReaderLocalOnFileWithoutChecksumNoReadahead()
+ throws IOException {
+ runBlockReaderLocalTest(new TestBlockReaderLocalOnFileWithoutChecksum(),
+ true, 0);
+ }
+
+ @Test
+ public void testBlockReaderLocalOnFileWithoutChecksumNoChecksumNoReadahead()
+ throws IOException {
+ runBlockReaderLocalTest(new TestBlockReaderLocalOnFileWithoutChecksum(),
+ false, 0);
}
@Test(timeout=60000)
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java?rev=1552205&r1=1552204&r2=1552205&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java Thu Dec 19 02:03:47 2013
@@ -28,6 +28,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.security.token.Token;
import org.junit.Assert;
@@ -138,7 +139,8 @@ public class TestConnCache {
Matchers.anyLong(),
Matchers.anyInt(),
Matchers.anyBoolean(),
- Matchers.anyString());
+ Matchers.anyString(),
+ (CachingStrategy)Matchers.anyObject());
// Initial read
pread(in, 0, dataBuf, 0, dataBuf.length, authenticData);
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java?rev=1552205&r1=1552204&r2=1552205&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java Thu Dec 19 02:03:47 2013
@@ -337,6 +337,58 @@ public class TestDecommission {
}
/**
+ * Tests decommission with replicas on the target datanode cannot be migrated
+ * to other datanodes and satisfy the replication factor. Make sure the
+ * datanode won't get stuck in decommissioning state.
+ */
+ @Test(timeout = 360000)
+ public void testDecommission2() throws IOException {
+ LOG.info("Starting test testDecommission");
+ int numNamenodes = 1;
+ int numDatanodes = 4;
+ conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 3);
+ startCluster(numNamenodes, numDatanodes, conf);
+
+ ArrayList<ArrayList<DatanodeInfo>> namenodeDecomList = new ArrayList<ArrayList<DatanodeInfo>>(
+ numNamenodes);
+ namenodeDecomList.add(0, new ArrayList<DatanodeInfo>(numDatanodes));
+
+ Path file1 = new Path("testDecommission2.dat");
+ int replicas = 4;
+
+ // Start decommissioning one namenode at a time
+ ArrayList<DatanodeInfo> decommissionedNodes = namenodeDecomList.get(0);
+ FileSystem fileSys = cluster.getFileSystem(0);
+ FSNamesystem ns = cluster.getNamesystem(0);
+
+ writeFile(fileSys, file1, replicas);
+
+ int deadDecomissioned = ns.getNumDecomDeadDataNodes();
+ int liveDecomissioned = ns.getNumDecomLiveDataNodes();
+
+ // Decommission one node. Verify that node is decommissioned.
+ DatanodeInfo decomNode = decommissionNode(0, decommissionedNodes,
+ AdminStates.DECOMMISSIONED);
+ decommissionedNodes.add(decomNode);
+ assertEquals(deadDecomissioned, ns.getNumDecomDeadDataNodes());
+ assertEquals(liveDecomissioned + 1, ns.getNumDecomLiveDataNodes());
+
+ // Ensure decommissioned datanode is not automatically shutdown
+ DFSClient client = getDfsClient(cluster.getNameNode(0), conf);
+ assertEquals("All datanodes must be alive", numDatanodes,
+ client.datanodeReport(DatanodeReportType.LIVE).length);
+ assertNull(checkFile(fileSys, file1, replicas, decomNode.getXferAddr(),
+ numDatanodes));
+ cleanupFile(fileSys, file1);
+
+ // Restart the cluster and ensure recommissioned datanodes
+ // are allowed to register with the namenode
+ cluster.shutdown();
+ startCluster(1, 4, conf);
+ cluster.shutdown();
+ }
+
+ /**
* Tests recommission for non federated cluster
*/
@Test(timeout=360000)
@@ -388,7 +440,20 @@ public class TestDecommission {
DFSClient client = getDfsClient(cluster.getNameNode(i), conf);
assertEquals("All datanodes must be alive", numDatanodes,
client.datanodeReport(DatanodeReportType.LIVE).length);
- assertNull(checkFile(fileSys, file1, replicas, decomNode.getXferAddr(), numDatanodes));
+ // wait for the block to be replicated
+ int tries = 0;
+ while (tries++ < 20) {
+ try {
+ Thread.sleep(1000);
+ if (checkFile(fileSys, file1, replicas, decomNode.getXferAddr(),
+ numDatanodes) == null) {
+ break;
+ }
+ } catch (InterruptedException ie) {
+ }
+ }
+ assertTrue("Checked if block was replicated after decommission, tried "
+ + tries + " times.", tries < 20);
cleanupFile(fileSys, file1);
}
}
@@ -429,12 +494,25 @@ public class TestDecommission {
DFSClient client = getDfsClient(cluster.getNameNode(i), conf);
assertEquals("All datanodes must be alive", numDatanodes,
client.datanodeReport(DatanodeReportType.LIVE).length);
- assertNull(checkFile(fileSys, file1, replicas, decomNode.getXferAddr(), numDatanodes));
+ int tries =0;
+ // wait for the block to be replicated
+ while (tries++ < 20) {
+ try {
+ Thread.sleep(1000);
+ if (checkFile(fileSys, file1, replicas, decomNode.getXferAddr(),
+ numDatanodes) == null) {
+ break;
+ }
+ } catch (InterruptedException ie) {
+ }
+ }
+ assertTrue("Checked if block was replicated after decommission, tried "
+ + tries + " times.", tries < 20);
// stop decommission and check if the new replicas are removed
recomissionNode(decomNode);
// wait for the block to be deleted
- int tries = 0;
+ tries = 0;
while (tries++ < 20) {
try {
Thread.sleep(1000);
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java?rev=1552205&r1=1552204&r2=1552205&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java Thu Dec 19 02:03:47 2013
@@ -259,7 +259,6 @@ public class TestShortCircuitLocalRead {
assertTrue("/ should be a directory", fs.getFileStatus(path)
.isDirectory() == true);
- // create a new file in home directory. Do not close it.
byte[] fileData = AppendTestUtil.randomBytes(seed, size);
Path file1 = fs.makeQualified(new Path("filelocal.dat"));
FSDataOutputStream stm = createFile(fs, file1, 1);
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java?rev=1552205&r1=1552204&r2=1552205&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java Thu Dec 19 02:03:47 2013
@@ -71,7 +71,15 @@ import org.mockito.invocation.Invocation
/**
* This test simulates a variety of situations when blocks are being
* intentionally corrupted, unexpectedly modified, and so on before a block
- * report is happening
+ * report is happening.
+ *
+ * For each test case it runs two variations:
+ * #1 - For a given DN, the first variation sends block reports for all
+ * storages in a single call to the NN.
+ * #2 - For a given DN, the second variation sends block reports for each
+ * storage in a separate call.
+ *
+ * The behavior should be the same in either variation.
*/
public class TestBlockReport {
public static final Log LOG = LogFactory.getLog(TestBlockReport.class);
@@ -158,14 +166,120 @@ public class TestBlockReport {
}
/**
+ * Utility routine to send block reports to the NN, either in a single call
+ * or reporting one storage per call.
+ *
+ * @param dnR
+ * @param poolId
+ * @param reports
+ * @param needtoSplit
+ * @throws IOException
+ */
+ private void sendBlockReports(DatanodeRegistration dnR, String poolId,
+ StorageBlockReport[] reports, boolean needtoSplit) throws IOException {
+ if (!needtoSplit) {
+ LOG.info("Sending combined block reports for " + dnR);
+ cluster.getNameNodeRpc().blockReport(dnR, poolId, reports);
+ } else {
+ for (StorageBlockReport report : reports) {
+ LOG.info("Sending block report for storage " + report.getStorage().getStorageID());
+ StorageBlockReport[] singletonReport = { report };
+ cluster.getNameNodeRpc().blockReport(dnR, poolId, singletonReport);
+ }
+ }
+ }
+
+ /**
+ * Test variations blockReport_01 through blockReport_09 with combined
+ * and split block reports.
+ */
+ @Test
+ public void blockReportCombined_01() throws IOException {
+ blockReport_01(false);
+ }
+
+ @Test
+ public void blockReportSplit_01() throws IOException {
+ blockReport_01(true);
+ }
+
+ @Test
+ public void blockReportCombined_02() throws IOException {
+ blockReport_02(false);
+ }
+
+ @Test
+ public void blockReportSplit_02() throws IOException {
+ blockReport_02(true);
+ }
+
+ @Test
+ public void blockReportCombined_03() throws IOException {
+ blockReport_03(false);
+ }
+
+ @Test
+ public void blockReportSplit_03() throws IOException {
+ blockReport_03(true);
+ }
+
+ @Test
+ public void blockReportCombined_04() throws IOException {
+ blockReport_04(false);
+ }
+
+ @Test
+ public void blockReportSplit_04() throws IOException {
+ blockReport_04(true);
+ }
+
+ @Test
+ public void blockReportCombined_06() throws Exception {
+ blockReport_06(false);
+ }
+
+ @Test
+ public void blockReportSplit_06() throws Exception {
+ blockReport_06(true);
+ }
+
+ @Test
+ public void blockReportCombined_07() throws Exception {
+ blockReport_07(false);
+ }
+
+ @Test
+ public void blockReportSplit_07() throws Exception {
+ blockReport_07(true);
+ }
+
+ @Test
+ public void blockReportCombined_08() throws Exception {
+ blockReport_08(false);
+ }
+
+ @Test
+ public void blockReportSplit_08() throws Exception {
+ blockReport_08(true);
+ }
+
+ @Test
+ public void blockReportCombined_09() throws Exception {
+ blockReport_09(false);
+ }
+
+ @Test
+ public void blockReportSplit_09() throws Exception {
+ blockReport_09(true);
+ }
+ /**
* Test write a file, verifies and closes it. Then the length of the blocks
* are messed up and BlockReport is forced.
* The modification of blocks' length has to be ignored
*
* @throws java.io.IOException on an error
*/
- @Test
- public void blockReport_01() throws IOException {
+ private void blockReport_01(boolean splitBlockReports) throws IOException {
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path filePath = new Path("/" + METHOD_NAME + ".dat");
@@ -198,7 +312,7 @@ public class TestBlockReport {
String poolId = cluster.getNamesystem().getBlockPoolId();
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
StorageBlockReport[] reports = getBlockReports(dn, poolId, false, false);
- cluster.getNameNodeRpc().blockReport(dnR, poolId, reports);
+ sendBlockReports(dnR, poolId, reports, splitBlockReports);
List<LocatedBlock> blocksAfterReport =
DFSTestUtil.getAllBlocks(fs.open(filePath));
@@ -224,8 +338,7 @@ public class TestBlockReport {
*
* @throws IOException in case of errors
*/
- @Test
- public void blockReport_02() throws IOException {
+ private void blockReport_02(boolean splitBlockReports) throws IOException {
final String METHOD_NAME = GenericTestUtils.getMethodName();
LOG.info("Running test " + METHOD_NAME);
@@ -280,7 +393,7 @@ public class TestBlockReport {
String poolId = cluster.getNamesystem().getBlockPoolId();
DatanodeRegistration dnR = dn0.getDNRegistrationForBP(poolId);
StorageBlockReport[] reports = getBlockReports(dn0, poolId, false, false);
- cluster.getNameNodeRpc().blockReport(dnR, poolId, reports);
+ sendBlockReports(dnR, poolId, reports, splitBlockReports);
BlockManagerTestUtil.getComputedDatanodeWork(cluster.getNamesystem()
.getBlockManager());
@@ -301,8 +414,7 @@ public class TestBlockReport {
*
* @throws IOException in case of an error
*/
- @Test
- public void blockReport_03() throws IOException {
+ private void blockReport_03(boolean splitBlockReports) throws IOException {
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path filePath = new Path("/" + METHOD_NAME + ".dat");
ArrayList<Block> blocks = writeFile(METHOD_NAME, FILE_SIZE, filePath);
@@ -312,11 +424,7 @@ public class TestBlockReport {
String poolId = cluster.getNamesystem().getBlockPoolId();
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
StorageBlockReport[] reports = getBlockReports(dn, poolId, true, false);
- DatanodeCommand dnCmd =
- cluster.getNameNodeRpc().blockReport(dnR, poolId, reports);
- if(LOG.isDebugEnabled()) {
- LOG.debug("Got the command: " + dnCmd);
- }
+ sendBlockReports(dnR, poolId, reports, splitBlockReports);
printStats();
assertThat("Wrong number of corrupt blocks",
@@ -333,8 +441,7 @@ public class TestBlockReport {
*
* @throws IOException in case of an error
*/
- @Test
- public void blockReport_04() throws IOException {
+ private void blockReport_04(boolean splitBlockReports) throws IOException {
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path filePath = new Path("/" + METHOD_NAME + ".dat");
DFSTestUtil.createFile(fs, filePath,
@@ -352,11 +459,7 @@ public class TestBlockReport {
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
StorageBlockReport[] reports = getBlockReports(dn, poolId, false, false);
- DatanodeCommand dnCmd =
- cluster.getNameNodeRpc().blockReport(dnR, poolId, reports);
- if(LOG.isDebugEnabled()) {
- LOG.debug("Got the command: " + dnCmd);
- }
+ sendBlockReports(dnR, poolId, reports, splitBlockReports);
printStats();
assertThat("Wrong number of corrupt blocks",
@@ -373,8 +476,7 @@ public class TestBlockReport {
*
* @throws IOException in case of an error
*/
- @Test
- public void blockReport_06() throws Exception {
+ private void blockReport_06(boolean splitBlockReports) throws Exception {
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path filePath = new Path("/" + METHOD_NAME + ".dat");
final int DN_N1 = DN_N0 + 1;
@@ -387,7 +489,7 @@ public class TestBlockReport {
String poolId = cluster.getNamesystem().getBlockPoolId();
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
StorageBlockReport[] reports = getBlockReports(dn, poolId, false, false);
- cluster.getNameNodeRpc().blockReport(dnR, poolId, reports);
+ sendBlockReports(dnR, poolId, reports, splitBlockReports);
printStats();
assertEquals("Wrong number of PendingReplication Blocks",
0, cluster.getNamesystem().getUnderReplicatedBlocks());
@@ -406,8 +508,7 @@ public class TestBlockReport {
*
* @throws IOException in case of an error
*/
- @Test
- public void blockReport_07() throws Exception {
+ private void blockReport_07(boolean splitBlockReports) throws Exception {
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path filePath = new Path("/" + METHOD_NAME + ".dat");
final int DN_N1 = DN_N0 + 1;
@@ -421,7 +522,7 @@ public class TestBlockReport {
String poolId = cluster.getNamesystem().getBlockPoolId();
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
StorageBlockReport[] reports = getBlockReports(dn, poolId, true, false);
- cluster.getNameNodeRpc().blockReport(dnR, poolId, reports);
+ sendBlockReports(dnR, poolId, reports, splitBlockReports);
printStats();
assertThat("Wrong number of corrupt blocks",
@@ -432,7 +533,7 @@ public class TestBlockReport {
cluster.getNamesystem().getPendingReplicationBlocks(), is(0L));
reports = getBlockReports(dn, poolId, true, true);
- cluster.getNameNodeRpc().blockReport(dnR, poolId, reports);
+ sendBlockReports(dnR, poolId, reports, splitBlockReports);
printStats();
assertThat("Wrong number of corrupt blocks",
@@ -458,8 +559,7 @@ public class TestBlockReport {
*
* @throws IOException in case of an error
*/
- @Test
- public void blockReport_08() throws IOException {
+ private void blockReport_08(boolean splitBlockReports) throws IOException {
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path filePath = new Path("/" + METHOD_NAME + ".dat");
final int DN_N1 = DN_N0 + 1;
@@ -483,8 +583,8 @@ public class TestBlockReport {
DataNode dn = cluster.getDataNodes().get(DN_N1);
String poolId = cluster.getNamesystem().getBlockPoolId();
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
- StorageBlockReport[] report = getBlockReports(dn, poolId, false, false);
- cluster.getNameNodeRpc().blockReport(dnR, poolId, report);
+ StorageBlockReport[] reports = getBlockReports(dn, poolId, false, false);
+ sendBlockReports(dnR, poolId, reports, splitBlockReports);
printStats();
assertEquals("Wrong number of PendingReplication blocks",
blocks.size(), cluster.getNamesystem().getPendingReplicationBlocks());
@@ -500,8 +600,7 @@ public class TestBlockReport {
// Similar to BlockReport_08 but corrupts GS and len of the TEMPORARY's
// replica block. Expect the same behaviour: NN should simply ignore this
// block
- @Test
- public void blockReport_09() throws IOException {
+ private void blockReport_09(boolean splitBlockReports) throws IOException {
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path filePath = new Path("/" + METHOD_NAME + ".dat");
final int DN_N1 = DN_N0 + 1;
@@ -526,8 +625,8 @@ public class TestBlockReport {
DataNode dn = cluster.getDataNodes().get(DN_N1);
String poolId = cluster.getNamesystem().getBlockPoolId();
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
- StorageBlockReport[] report = getBlockReports(dn, poolId, true, true);
- cluster.getNameNodeRpc().blockReport(dnR, poolId, report);
+ StorageBlockReport[] reports = getBlockReports(dn, poolId, true, true);
+ sendBlockReports(dnR, poolId, reports, splitBlockReports);
printStats();
assertEquals("Wrong number of PendingReplication blocks",
2, cluster.getNamesystem().getPendingReplicationBlocks());
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/OfflineEditsViewerHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/OfflineEditsViewerHelper.java?rev=1552205&r1=1552204&r2=1552205&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/OfflineEditsViewerHelper.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/OfflineEditsViewerHelper.java Thu Dec 19 02:03:47 2013
@@ -239,7 +239,7 @@ public class OfflineEditsViewerHelper {
.setOwnerName("carlton")
.setGroupName("party")
.setMode(new FsPermission((short)0700))
- .setWeight(1989));
+ .setLimit(1989l));
// OP_ADD_PATH_BASED_CACHE_DIRECTIVE 33
long id = dfs.addCacheDirective(
new CacheDirectiveInfo.Builder().
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java?rev=1552205&r1=1552204&r2=1552205&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java Thu Dec 19 02:03:47 2013
@@ -34,6 +34,7 @@ import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Date;
+import java.util.EnumSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
@@ -43,6 +44,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.CacheFlag;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileSystemTestHelper;
@@ -92,25 +94,48 @@ public class TestCacheDirectives {
static private MiniDFSCluster cluster;
static private DistributedFileSystem dfs;
static private NamenodeProtocols proto;
+ static private NameNode namenode;
static private CacheManipulator prevCacheManipulator;
static {
EditLogFileOutputStream.setShouldSkipFsyncForTesting(false);
}
- @Before
- public void setup() throws Exception {
- conf = new HdfsConfiguration();
+ private static final long BLOCK_SIZE = 512;
+ private static final int NUM_DATANODES = 4;
+ // Most Linux installs will allow non-root users to lock 64KB.
+ // In this test though, we stub out mlock so this doesn't matter.
+ private static final long CACHE_CAPACITY = 64 * 1024 / NUM_DATANODES;
+
+ private static HdfsConfiguration createCachingConf() {
+ HdfsConfiguration conf = new HdfsConfiguration();
+ conf.setLong(DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+ conf.setLong(DFS_DATANODE_MAX_LOCKED_MEMORY_KEY, CACHE_CAPACITY);
+ conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1);
+ conf.setBoolean(DFS_NAMENODE_CACHING_ENABLED_KEY, true);
+ conf.setLong(DFS_CACHEREPORT_INTERVAL_MSEC_KEY, 1000);
+ conf.setLong(DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS, 1000);
// set low limits here for testing purposes
conf.setInt(DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES, 2);
- conf.setInt(DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DIRECTIVES_NUM_RESPONSES, 2);
- cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+ conf.setInt(DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DIRECTIVES_NUM_RESPONSES,
+ 2);
+
+ return conf;
+ }
+
+ @Before
+ public void setup() throws Exception {
+ conf = createCachingConf();
+ cluster =
+ new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build();
cluster.waitActive();
dfs = cluster.getFileSystem();
proto = cluster.getNameNodeRpc();
+ namenode = cluster.getNameNode();
prevCacheManipulator = NativeIO.POSIX.getCacheManipulator();
NativeIO.POSIX.setCacheManipulator(new NoMlockCacheManipulator());
- LogManager.getLogger(CacheReplicationMonitor.class).setLevel(Level.TRACE);
+ LogManager.getLogger(CacheReplicationMonitor.class.getName()).setLevel(
+ Level.TRACE);
}
@After
@@ -127,7 +152,7 @@ public class TestCacheDirectives {
final String poolName = "pool1";
CachePoolInfo info = new CachePoolInfo(poolName).
setOwnerName("bob").setGroupName("bobgroup").
- setMode(new FsPermission((short)0755)).setWeight(150);
+ setMode(new FsPermission((short)0755)).setLimit(150l);
// Add a pool
dfs.addCachePool(info);
@@ -168,7 +193,7 @@ public class TestCacheDirectives {
// Modify the pool
info.setOwnerName("jane").setGroupName("janegroup")
- .setMode(new FsPermission((short)0700)).setWeight(314);
+ .setMode(new FsPermission((short)0700)).setLimit(314l);
dfs.modifyCachePool(info);
// Do some invalid modify pools
@@ -263,10 +288,10 @@ public class TestCacheDirectives {
String ownerName = "abc";
String groupName = "123";
FsPermission mode = new FsPermission((short)0755);
- int weight = 150;
+ long limit = 150;
dfs.addCachePool(new CachePoolInfo(poolName).
setOwnerName(ownerName).setGroupName(groupName).
- setMode(mode).setWeight(weight));
+ setMode(mode).setLimit(limit));
RemoteIterator<CachePoolEntry> iter = dfs.listCachePools();
CachePoolInfo info = iter.next().getInfo();
@@ -277,10 +302,10 @@ public class TestCacheDirectives {
ownerName = "def";
groupName = "456";
mode = new FsPermission((short)0700);
- weight = 151;
+ limit = 151;
dfs.modifyCachePool(new CachePoolInfo(poolName).
setOwnerName(ownerName).setGroupName(groupName).
- setMode(mode).setWeight(weight));
+ setMode(mode).setLimit(limit));
iter = dfs.listCachePools();
info = iter.next().getInfo();
@@ -288,7 +313,7 @@ public class TestCacheDirectives {
assertEquals(ownerName, info.getOwnerName());
assertEquals(groupName, info.getGroupName());
assertEquals(mode, info.getMode());
- assertEquals(Integer.valueOf(weight), info.getWeight());
+ assertEquals(limit, (long)info.getLimit());
dfs.removeCachePool(poolName);
iter = dfs.listCachePools();
@@ -495,30 +520,22 @@ public class TestCacheDirectives {
@Test(timeout=60000)
public void testCacheManagerRestart() throws Exception {
- cluster.shutdown();
- cluster = null;
- HdfsConfiguration conf = createCachingConf();
- cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
-
- cluster.waitActive();
- DistributedFileSystem dfs = cluster.getFileSystem();
-
// Create and validate a pool
final String pool = "poolparty";
String groupName = "partygroup";
FsPermission mode = new FsPermission((short)0777);
- int weight = 747;
+ long limit = 747;
dfs.addCachePool(new CachePoolInfo(pool)
.setGroupName(groupName)
.setMode(mode)
- .setWeight(weight));
+ .setLimit(limit));
RemoteIterator<CachePoolEntry> pit = dfs.listCachePools();
assertTrue("No cache pools found", pit.hasNext());
CachePoolInfo info = pit.next().getInfo();
assertEquals(pool, info.getPoolName());
assertEquals(groupName, info.getGroupName());
assertEquals(mode, info.getMode());
- assertEquals(weight, (int)info.getWeight());
+ assertEquals(limit, (long)info.getLimit());
assertFalse("Unexpected # of cache pools found", pit.hasNext());
// Create some cache entries
@@ -556,7 +573,7 @@ public class TestCacheDirectives {
assertEquals(pool, info.getPoolName());
assertEquals(groupName, info.getGroupName());
assertEquals(mode, info.getMode());
- assertEquals(weight, (int)info.getWeight());
+ assertEquals(limit, (long)info.getLimit());
assertFalse("Unexpected # of cache pools found", pit.hasNext());
dit = dfs.listCacheDirectives(null);
@@ -762,91 +779,64 @@ public class TestCacheDirectives {
numCachedReplicas);
}
- private static final long BLOCK_SIZE = 512;
- private static final int NUM_DATANODES = 4;
-
- // Most Linux installs will allow non-root users to lock 64KB.
- private static final long CACHE_CAPACITY = 64 * 1024 / NUM_DATANODES;
-
- private static HdfsConfiguration createCachingConf() {
- HdfsConfiguration conf = new HdfsConfiguration();
- conf.setLong(DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
- conf.setLong(DFS_DATANODE_MAX_LOCKED_MEMORY_KEY, CACHE_CAPACITY);
- conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1);
- conf.setBoolean(DFS_NAMENODE_CACHING_ENABLED_KEY, true);
- conf.setLong(DFS_CACHEREPORT_INTERVAL_MSEC_KEY, 1000);
- conf.setLong(DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS, 1000);
- return conf;
- }
-
@Test(timeout=120000)
public void testWaitForCachedReplicas() throws Exception {
- HdfsConfiguration conf = createCachingConf();
FileSystemTestHelper helper = new FileSystemTestHelper();
- MiniDFSCluster cluster =
- new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build();
-
- try {
- cluster.waitActive();
- DistributedFileSystem dfs = cluster.getFileSystem();
- final NameNode namenode = cluster.getNameNode();
- GenericTestUtils.waitFor(new Supplier<Boolean>() {
- @Override
- public Boolean get() {
- return ((namenode.getNamesystem().getCacheCapacity() ==
- (NUM_DATANODES * CACHE_CAPACITY)) &&
- (namenode.getNamesystem().getCacheUsed() == 0));
- }
- }, 500, 60000);
-
- NamenodeProtocols nnRpc = namenode.getRpcServer();
- Path rootDir = helper.getDefaultWorkingDirectory(dfs);
- // Create the pool
- final String pool = "friendlyPool";
- nnRpc.addCachePool(new CachePoolInfo("friendlyPool"));
- // Create some test files
- final int numFiles = 2;
- final int numBlocksPerFile = 2;
- final List<String> paths = new ArrayList<String>(numFiles);
- for (int i=0; i<numFiles; i++) {
- Path p = new Path(rootDir, "testCachePaths-" + i);
- FileSystemTestHelper.createFile(dfs, p, numBlocksPerFile,
- (int)BLOCK_SIZE);
- paths.add(p.toUri().getPath());
- }
- // Check the initial statistics at the namenode
- waitForCachedBlocks(namenode, 0, 0, "testWaitForCachedReplicas:0");
- // Cache and check each path in sequence
- int expected = 0;
- for (int i=0; i<numFiles; i++) {
- CacheDirectiveInfo directive =
- new CacheDirectiveInfo.Builder().
- setPath(new Path(paths.get(i))).
- setPool(pool).
- build();
- nnRpc.addCacheDirective(directive);
- expected += numBlocksPerFile;
- waitForCachedBlocks(namenode, expected, expected,
- "testWaitForCachedReplicas:1");
- }
- // Uncache and check each path in sequence
- RemoteIterator<CacheDirectiveEntry> entries =
- new CacheDirectiveIterator(nnRpc, null);
- for (int i=0; i<numFiles; i++) {
- CacheDirectiveEntry entry = entries.next();
- nnRpc.removeCacheDirective(entry.getInfo().getId());
- expected -= numBlocksPerFile;
- waitForCachedBlocks(namenode, expected, expected,
- "testWaitForCachedReplicas:2");
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ return ((namenode.getNamesystem().getCacheCapacity() ==
+ (NUM_DATANODES * CACHE_CAPACITY)) &&
+ (namenode.getNamesystem().getCacheUsed() == 0));
}
- } finally {
- cluster.shutdown();
+ }, 500, 60000);
+
+ NamenodeProtocols nnRpc = namenode.getRpcServer();
+ Path rootDir = helper.getDefaultWorkingDirectory(dfs);
+ // Create the pool
+ final String pool = "friendlyPool";
+ nnRpc.addCachePool(new CachePoolInfo("friendlyPool"));
+ // Create some test files
+ final int numFiles = 2;
+ final int numBlocksPerFile = 2;
+ final List<String> paths = new ArrayList<String>(numFiles);
+ for (int i=0; i<numFiles; i++) {
+ Path p = new Path(rootDir, "testCachePaths-" + i);
+ FileSystemTestHelper.createFile(dfs, p, numBlocksPerFile,
+ (int)BLOCK_SIZE);
+ paths.add(p.toUri().getPath());
+ }
+ // Check the initial statistics at the namenode
+ waitForCachedBlocks(namenode, 0, 0, "testWaitForCachedReplicas:0");
+ // Cache and check each path in sequence
+ int expected = 0;
+ for (int i=0; i<numFiles; i++) {
+ CacheDirectiveInfo directive =
+ new CacheDirectiveInfo.Builder().
+ setPath(new Path(paths.get(i))).
+ setPool(pool).
+ build();
+ nnRpc.addCacheDirective(directive, EnumSet.noneOf(CacheFlag.class));
+ expected += numBlocksPerFile;
+ waitForCachedBlocks(namenode, expected, expected,
+ "testWaitForCachedReplicas:1");
+ }
+ // Uncache and check each path in sequence
+ RemoteIterator<CacheDirectiveEntry> entries =
+ new CacheDirectiveIterator(nnRpc, null);
+ for (int i=0; i<numFiles; i++) {
+ CacheDirectiveEntry entry = entries.next();
+ nnRpc.removeCacheDirective(entry.getInfo().getId());
+ expected -= numBlocksPerFile;
+ waitForCachedBlocks(namenode, expected, expected,
+ "testWaitForCachedReplicas:2");
}
}
@Test(timeout=120000)
public void testAddingCacheDirectiveInfosWhenCachingIsDisabled()
throws Exception {
+ cluster.shutdown();
HdfsConfiguration conf = createCachingConf();
conf.setBoolean(DFS_NAMENODE_CACHING_ENABLED_KEY, false);
MiniDFSCluster cluster =
@@ -894,103 +884,92 @@ public class TestCacheDirectives {
@Test(timeout=120000)
public void testWaitForCachedReplicasInDirectory() throws Exception {
- HdfsConfiguration conf = createCachingConf();
- MiniDFSCluster cluster =
- new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build();
-
- try {
- cluster.waitActive();
- DistributedFileSystem dfs = cluster.getFileSystem();
- NameNode namenode = cluster.getNameNode();
- // Create the pool
- final String pool = "friendlyPool";
- final CachePoolInfo poolInfo = new CachePoolInfo(pool);
- dfs.addCachePool(poolInfo);
- // Create some test files
- final List<Path> paths = new LinkedList<Path>();
- paths.add(new Path("/foo/bar"));
- paths.add(new Path("/foo/baz"));
- paths.add(new Path("/foo2/bar2"));
- paths.add(new Path("/foo2/baz2"));
- dfs.mkdir(new Path("/foo"), FsPermission.getDirDefault());
- dfs.mkdir(new Path("/foo2"), FsPermission.getDirDefault());
- final int numBlocksPerFile = 2;
- for (Path path : paths) {
- FileSystemTestHelper.createFile(dfs, path, numBlocksPerFile,
- (int)BLOCK_SIZE, (short)3, false);
- }
- waitForCachedBlocks(namenode, 0, 0,
- "testWaitForCachedReplicasInDirectory:0");
+ // Create the pool
+ final String pool = "friendlyPool";
+ final CachePoolInfo poolInfo = new CachePoolInfo(pool);
+ dfs.addCachePool(poolInfo);
+ // Create some test files
+ final List<Path> paths = new LinkedList<Path>();
+ paths.add(new Path("/foo/bar"));
+ paths.add(new Path("/foo/baz"));
+ paths.add(new Path("/foo2/bar2"));
+ paths.add(new Path("/foo2/baz2"));
+ dfs.mkdir(new Path("/foo"), FsPermission.getDirDefault());
+ dfs.mkdir(new Path("/foo2"), FsPermission.getDirDefault());
+ final int numBlocksPerFile = 2;
+ for (Path path : paths) {
+ FileSystemTestHelper.createFile(dfs, path, numBlocksPerFile,
+ (int)BLOCK_SIZE, (short)3, false);
+ }
+ waitForCachedBlocks(namenode, 0, 0,
+ "testWaitForCachedReplicasInDirectory:0");
- // cache entire directory
- long id = dfs.addCacheDirective(
- new CacheDirectiveInfo.Builder().
- setPath(new Path("/foo")).
- setReplication((short)2).
- setPool(pool).
- build());
- waitForCachedBlocks(namenode, 4, 8,
- "testWaitForCachedReplicasInDirectory:1:blocks");
- // Verify that listDirectives gives the stats we want.
- waitForCacheDirectiveStats(dfs,
- 4 * numBlocksPerFile * BLOCK_SIZE, 4 * numBlocksPerFile * BLOCK_SIZE,
- 2, 2,
+ // cache entire directory
+ long id = dfs.addCacheDirective(
new CacheDirectiveInfo.Builder().
- setPath(new Path("/foo")).
- build(),
- "testWaitForCachedReplicasInDirectory:1:directive");
- waitForCachePoolStats(dfs,
- 4 * numBlocksPerFile * BLOCK_SIZE, 4 * numBlocksPerFile * BLOCK_SIZE,
- 2, 2,
- poolInfo, "testWaitForCachedReplicasInDirectory:1:pool");
+ setPath(new Path("/foo")).
+ setReplication((short)2).
+ setPool(pool).
+ build());
+ waitForCachedBlocks(namenode, 4, 8,
+ "testWaitForCachedReplicasInDirectory:1:blocks");
+ // Verify that listDirectives gives the stats we want.
+ waitForCacheDirectiveStats(dfs,
+ 4 * numBlocksPerFile * BLOCK_SIZE, 4 * numBlocksPerFile * BLOCK_SIZE,
+ 2, 2,
+ new CacheDirectiveInfo.Builder().
+ setPath(new Path("/foo")).
+ build(),
+ "testWaitForCachedReplicasInDirectory:1:directive");
+ waitForCachePoolStats(dfs,
+ 4 * numBlocksPerFile * BLOCK_SIZE, 4 * numBlocksPerFile * BLOCK_SIZE,
+ 2, 2,
+ poolInfo, "testWaitForCachedReplicasInDirectory:1:pool");
- long id2 = dfs.addCacheDirective(
- new CacheDirectiveInfo.Builder().
- setPath(new Path("/foo/bar")).
- setReplication((short)4).
- setPool(pool).
- build());
- // wait for an additional 2 cached replicas to come up
- waitForCachedBlocks(namenode, 4, 10,
- "testWaitForCachedReplicasInDirectory:2:blocks");
- // the directory directive's stats are unchanged
- waitForCacheDirectiveStats(dfs,
- 4 * numBlocksPerFile * BLOCK_SIZE, 4 * numBlocksPerFile * BLOCK_SIZE,
- 2, 2,
- new CacheDirectiveInfo.Builder().
- setPath(new Path("/foo")).
- build(),
- "testWaitForCachedReplicasInDirectory:2:directive-1");
- // verify /foo/bar's stats
- waitForCacheDirectiveStats(dfs,
- 4 * numBlocksPerFile * BLOCK_SIZE,
- // only 3 because the file only has 3 replicas, not 4 as requested.
- 3 * numBlocksPerFile * BLOCK_SIZE,
- 1,
- // only 0 because the file can't be fully cached
- 0,
+ long id2 = dfs.addCacheDirective(
new CacheDirectiveInfo.Builder().
- setPath(new Path("/foo/bar")).
- build(),
- "testWaitForCachedReplicasInDirectory:2:directive-2");
- waitForCachePoolStats(dfs,
- (4+4) * numBlocksPerFile * BLOCK_SIZE,
- (4+3) * numBlocksPerFile * BLOCK_SIZE,
- 3, 2,
- poolInfo, "testWaitForCachedReplicasInDirectory:2:pool");
-
- // remove and watch numCached go to 0
- dfs.removeCacheDirective(id);
- dfs.removeCacheDirective(id2);
- waitForCachedBlocks(namenode, 0, 0,
- "testWaitForCachedReplicasInDirectory:3:blocks");
- waitForCachePoolStats(dfs,
- 0, 0,
- 0, 0,
- poolInfo, "testWaitForCachedReplicasInDirectory:3:pool");
- } finally {
- cluster.shutdown();
- }
+ setPath(new Path("/foo/bar")).
+ setReplication((short)4).
+ setPool(pool).
+ build());
+ // wait for an additional 2 cached replicas to come up
+ waitForCachedBlocks(namenode, 4, 10,
+ "testWaitForCachedReplicasInDirectory:2:blocks");
+ // the directory directive's stats are unchanged
+ waitForCacheDirectiveStats(dfs,
+ 4 * numBlocksPerFile * BLOCK_SIZE, 4 * numBlocksPerFile * BLOCK_SIZE,
+ 2, 2,
+ new CacheDirectiveInfo.Builder().
+ setPath(new Path("/foo")).
+ build(),
+ "testWaitForCachedReplicasInDirectory:2:directive-1");
+ // verify /foo/bar's stats
+ waitForCacheDirectiveStats(dfs,
+ 4 * numBlocksPerFile * BLOCK_SIZE,
+ // only 3 because the file only has 3 replicas, not 4 as requested.
+ 3 * numBlocksPerFile * BLOCK_SIZE,
+ 1,
+ // only 0 because the file can't be fully cached
+ 0,
+ new CacheDirectiveInfo.Builder().
+ setPath(new Path("/foo/bar")).
+ build(),
+ "testWaitForCachedReplicasInDirectory:2:directive-2");
+ waitForCachePoolStats(dfs,
+ (4+4) * numBlocksPerFile * BLOCK_SIZE,
+ (4+3) * numBlocksPerFile * BLOCK_SIZE,
+ 3, 2,
+ poolInfo, "testWaitForCachedReplicasInDirectory:2:pool");
+
+ // remove and watch numCached go to 0
+ dfs.removeCacheDirective(id);
+ dfs.removeCacheDirective(id2);
+ waitForCachedBlocks(namenode, 0, 0,
+ "testWaitForCachedReplicasInDirectory:3:blocks");
+ waitForCachePoolStats(dfs,
+ 0, 0,
+ 0, 0,
+ poolInfo, "testWaitForCachedReplicasInDirectory:3:pool");
}
/**
@@ -1000,68 +979,57 @@ public class TestCacheDirectives {
*/
@Test(timeout=120000)
public void testReplicationFactor() throws Exception {
- HdfsConfiguration conf = createCachingConf();
- MiniDFSCluster cluster =
- new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build();
-
- try {
- cluster.waitActive();
- DistributedFileSystem dfs = cluster.getFileSystem();
- NameNode namenode = cluster.getNameNode();
- // Create the pool
- final String pool = "friendlyPool";
- dfs.addCachePool(new CachePoolInfo(pool));
- // Create some test files
- final List<Path> paths = new LinkedList<Path>();
- paths.add(new Path("/foo/bar"));
- paths.add(new Path("/foo/baz"));
- paths.add(new Path("/foo2/bar2"));
- paths.add(new Path("/foo2/baz2"));
- dfs.mkdir(new Path("/foo"), FsPermission.getDirDefault());
- dfs.mkdir(new Path("/foo2"), FsPermission.getDirDefault());
- final int numBlocksPerFile = 2;
- for (Path path : paths) {
- FileSystemTestHelper.createFile(dfs, path, numBlocksPerFile,
- (int)BLOCK_SIZE, (short)3, false);
- }
- waitForCachedBlocks(namenode, 0, 0, "testReplicationFactor:0");
- checkNumCachedReplicas(dfs, paths, 0, 0);
- // cache directory
- long id = dfs.addCacheDirective(
+ // Create the pool
+ final String pool = "friendlyPool";
+ dfs.addCachePool(new CachePoolInfo(pool));
+ // Create some test files
+ final List<Path> paths = new LinkedList<Path>();
+ paths.add(new Path("/foo/bar"));
+ paths.add(new Path("/foo/baz"));
+ paths.add(new Path("/foo2/bar2"));
+ paths.add(new Path("/foo2/baz2"));
+ dfs.mkdir(new Path("/foo"), FsPermission.getDirDefault());
+ dfs.mkdir(new Path("/foo2"), FsPermission.getDirDefault());
+ final int numBlocksPerFile = 2;
+ for (Path path : paths) {
+ FileSystemTestHelper.createFile(dfs, path, numBlocksPerFile,
+ (int)BLOCK_SIZE, (short)3, false);
+ }
+ waitForCachedBlocks(namenode, 0, 0, "testReplicationFactor:0");
+ checkNumCachedReplicas(dfs, paths, 0, 0);
+ // cache directory
+ long id = dfs.addCacheDirective(
+ new CacheDirectiveInfo.Builder().
+ setPath(new Path("/foo")).
+ setReplication((short)1).
+ setPool(pool).
+ build());
+ waitForCachedBlocks(namenode, 4, 4, "testReplicationFactor:1");
+ checkNumCachedReplicas(dfs, paths, 4, 4);
+ // step up the replication factor
+ for (int i=2; i<=3; i++) {
+ dfs.modifyCacheDirective(
new CacheDirectiveInfo.Builder().
- setPath(new Path("/foo")).
- setReplication((short)1).
- setPool(pool).
- build());
- waitForCachedBlocks(namenode, 4, 4, "testReplicationFactor:1");
- checkNumCachedReplicas(dfs, paths, 4, 4);
- // step up the replication factor
- for (int i=2; i<=3; i++) {
- dfs.modifyCacheDirective(
- new CacheDirectiveInfo.Builder().
- setId(id).
- setReplication((short)i).
- build());
- waitForCachedBlocks(namenode, 4, 4*i, "testReplicationFactor:2");
- checkNumCachedReplicas(dfs, paths, 4, 4*i);
- }
- // step it down
- for (int i=2; i>=1; i--) {
- dfs.modifyCacheDirective(
- new CacheDirectiveInfo.Builder().
- setId(id).
- setReplication((short)i).
- build());
- waitForCachedBlocks(namenode, 4, 4*i, "testReplicationFactor:3");
- checkNumCachedReplicas(dfs, paths, 4, 4*i);
- }
- // remove and watch numCached go to 0
- dfs.removeCacheDirective(id);
- waitForCachedBlocks(namenode, 0, 0, "testReplicationFactor:4");
- checkNumCachedReplicas(dfs, paths, 0, 0);
- } finally {
- cluster.shutdown();
+ setId(id).
+ setReplication((short)i).
+ build());
+ waitForCachedBlocks(namenode, 4, 4*i, "testReplicationFactor:2");
+ checkNumCachedReplicas(dfs, paths, 4, 4*i);
+ }
+ // step it down
+ for (int i=2; i>=1; i--) {
+ dfs.modifyCacheDirective(
+ new CacheDirectiveInfo.Builder().
+ setId(id).
+ setReplication((short)i).
+ build());
+ waitForCachedBlocks(namenode, 4, 4*i, "testReplicationFactor:3");
+ checkNumCachedReplicas(dfs, paths, 4, 4*i);
}
+ // remove and watch numCached go to 0
+ dfs.removeCacheDirective(id);
+ waitForCachedBlocks(namenode, 0, 0, "testReplicationFactor:4");
+ checkNumCachedReplicas(dfs, paths, 0, 0);
}
@Test(timeout=60000)
@@ -1081,11 +1049,12 @@ public class TestCacheDirectives {
assertNull("Unexpected owner name", info.getOwnerName());
assertNull("Unexpected group name", info.getGroupName());
assertNull("Unexpected mode", info.getMode());
- assertNull("Unexpected weight", info.getWeight());
+ assertNull("Unexpected limit", info.getLimit());
// Modify the pool so myuser is now the owner
+ final long limit = 99;
dfs.modifyCachePool(new CachePoolInfo(poolName)
.setOwnerName(myUser.getShortUserName())
- .setWeight(99));
+ .setLimit(limit));
// Should see full info
it = myDfs.listCachePools();
info = it.next().getInfo();
@@ -1096,60 +1065,127 @@ public class TestCacheDirectives {
assertNotNull("Expected group name", info.getGroupName());
assertEquals("Mismatched mode", (short) 0700,
info.getMode().toShort());
- assertEquals("Mismatched weight", 99, (int)info.getWeight());
+ assertEquals("Mismatched limit", limit, (long)info.getLimit());
}
- @Test(timeout=60000)
+ @Test(timeout=120000)
public void testExpiry() throws Exception {
- HdfsConfiguration conf = createCachingConf();
- MiniDFSCluster cluster =
- new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build();
+ String pool = "pool1";
+ dfs.addCachePool(new CachePoolInfo(pool));
+ Path p = new Path("/mypath");
+ DFSTestUtil.createFile(dfs, p, BLOCK_SIZE*2, (short)2, 0x999);
+ // Expire after test timeout
+ Date start = new Date();
+ Date expiry = DateUtils.addSeconds(start, 120);
+ final long id = dfs.addCacheDirective(new CacheDirectiveInfo.Builder()
+ .setPath(p)
+ .setPool(pool)
+ .setExpiration(CacheDirectiveInfo.Expiration.newAbsolute(expiry))
+ .setReplication((short)2)
+ .build());
+ waitForCachedBlocks(cluster.getNameNode(), 2, 4, "testExpiry:1");
+ // Change it to expire sooner
+ dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder().setId(id)
+ .setExpiration(Expiration.newRelative(0)).build());
+ waitForCachedBlocks(cluster.getNameNode(), 0, 0, "testExpiry:2");
+ RemoteIterator<CacheDirectiveEntry> it = dfs.listCacheDirectives(null);
+ CacheDirectiveEntry ent = it.next();
+ assertFalse(it.hasNext());
+ Date entryExpiry = new Date(ent.getInfo().getExpiration().getMillis());
+ assertTrue("Directive should have expired",
+ entryExpiry.before(new Date()));
+ // Change it back to expire later
+ dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder().setId(id)
+ .setExpiration(Expiration.newRelative(120000)).build());
+ waitForCachedBlocks(cluster.getNameNode(), 2, 4, "testExpiry:3");
+ it = dfs.listCacheDirectives(null);
+ ent = it.next();
+ assertFalse(it.hasNext());
+ entryExpiry = new Date(ent.getInfo().getExpiration().getMillis());
+ assertTrue("Directive should not have expired",
+ entryExpiry.after(new Date()));
+ // Verify that setting a negative TTL throws an error
try {
- DistributedFileSystem dfs = cluster.getFileSystem();
- String pool = "pool1";
- dfs.addCachePool(new CachePoolInfo(pool));
- Path p = new Path("/mypath");
- DFSTestUtil.createFile(dfs, p, BLOCK_SIZE*2, (short)2, 0x999);
- // Expire after test timeout
- Date start = new Date();
- Date expiry = DateUtils.addSeconds(start, 120);
- final long id = dfs.addCacheDirective(new CacheDirectiveInfo.Builder()
- .setPath(p)
- .setPool(pool)
- .setExpiration(CacheDirectiveInfo.Expiration.newAbsolute(expiry))
- .setReplication((short)2)
- .build());
- waitForCachedBlocks(cluster.getNameNode(), 2, 4, "testExpiry:1");
- // Change it to expire sooner
- dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder().setId(id)
- .setExpiration(Expiration.newRelative(0)).build());
- waitForCachedBlocks(cluster.getNameNode(), 0, 0, "testExpiry:2");
- RemoteIterator<CacheDirectiveEntry> it = dfs.listCacheDirectives(null);
- CacheDirectiveEntry ent = it.next();
- assertFalse(it.hasNext());
- Date entryExpiry = new Date(ent.getInfo().getExpiration().getMillis());
- assertTrue("Directive should have expired",
- entryExpiry.before(new Date()));
- // Change it back to expire later
dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder().setId(id)
- .setExpiration(Expiration.newRelative(120000)).build());
- waitForCachedBlocks(cluster.getNameNode(), 2, 4, "testExpiry:3");
- it = dfs.listCacheDirectives(null);
- ent = it.next();
- assertFalse(it.hasNext());
- entryExpiry = new Date(ent.getInfo().getExpiration().getMillis());
- assertTrue("Directive should not have expired",
- entryExpiry.after(new Date()));
- // Verify that setting a negative TTL throws an error
- try {
- dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder().setId(id)
- .setExpiration(Expiration.newRelative(-1)).build());
- } catch (InvalidRequestException e) {
- GenericTestUtils
- .assertExceptionContains("Cannot set a negative expiration", e);
- }
- } finally {
- cluster.shutdown();
+ .setExpiration(Expiration.newRelative(-1)).build());
+ } catch (InvalidRequestException e) {
+ GenericTestUtils
+ .assertExceptionContains("Cannot set a negative expiration", e);
+ }
+ }
+
+ @Test(timeout=120000)
+ public void testLimit() throws Exception {
+ try {
+ dfs.addCachePool(new CachePoolInfo("poolofnegativity").setLimit(-99l));
+ fail("Should not be able to set a negative limit");
+ } catch (InvalidRequestException e) {
+ GenericTestUtils.assertExceptionContains("negative", e);
+ }
+ final String destiny = "poolofdestiny";
+ final Path path1 = new Path("/destiny");
+ DFSTestUtil.createFile(dfs, path1, 2*BLOCK_SIZE, (short)1, 0x9494);
+ // Start off with a limit that is too small
+ final CachePoolInfo poolInfo = new CachePoolInfo(destiny)
+ .setLimit(2*BLOCK_SIZE-1);
+ dfs.addCachePool(poolInfo);
+ final CacheDirectiveInfo info1 = new CacheDirectiveInfo.Builder()
+ .setPool(destiny).setPath(path1).build();
+ try {
+ dfs.addCacheDirective(info1);
+ fail("Should not be able to cache when there is no more limit");
+ } catch (InvalidRequestException e) {
+ GenericTestUtils.assertExceptionContains("remaining capacity", e);
+ }
+ // Raise the limit up to fit and it should work this time
+ poolInfo.setLimit(2*BLOCK_SIZE);
+ dfs.modifyCachePool(poolInfo);
+ long id1 = dfs.addCacheDirective(info1);
+ waitForCachePoolStats(dfs,
+ 2*BLOCK_SIZE, 2*BLOCK_SIZE,
+ 1, 1,
+ poolInfo, "testLimit:1");
+ // Adding another file, it shouldn't be cached
+ final Path path2 = new Path("/failure");
+ DFSTestUtil.createFile(dfs, path2, BLOCK_SIZE, (short)1, 0x9495);
+ try {
+ dfs.addCacheDirective(new CacheDirectiveInfo.Builder()
+ .setPool(destiny).setPath(path2).build(),
+ EnumSet.noneOf(CacheFlag.class));
+ fail("Should not be able to add another cached file");
+ } catch (InvalidRequestException e) {
+ GenericTestUtils.assertExceptionContains("remaining capacity", e);
}
+ // Bring the limit down, the first file should get uncached
+ poolInfo.setLimit(BLOCK_SIZE);
+ dfs.modifyCachePool(poolInfo);
+ waitForCachePoolStats(dfs,
+ 2*BLOCK_SIZE, 0,
+ 1, 0,
+ poolInfo, "testLimit:2");
+ RemoteIterator<CachePoolEntry> it = dfs.listCachePools();
+ assertTrue("Expected a cache pool", it.hasNext());
+ CachePoolStats stats = it.next().getStats();
+ assertEquals("Overlimit bytes should be difference of needed and limit",
+ BLOCK_SIZE, stats.getBytesOverlimit());
+ // Moving a directive to a pool without enough limit should fail
+ CachePoolInfo inadequate =
+ new CachePoolInfo("poolofinadequacy").setLimit(BLOCK_SIZE);
+ dfs.addCachePool(inadequate);
+ try {
+ dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder(info1)
+ .setId(id1).setPool(inadequate.getPoolName()).build(),
+ EnumSet.noneOf(CacheFlag.class));
+ } catch(InvalidRequestException e) {
+ GenericTestUtils.assertExceptionContains("remaining capacity", e);
+ }
+ // Succeeds when force=true
+ dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder(info1).setId(id1)
+ .setPool(inadequate.getPoolName()).build(),
+ EnumSet.of(CacheFlag.FORCE));
+ // Also can add with force=true
+ dfs.addCacheDirective(
+ new CacheDirectiveInfo.Builder().setPool(inadequate.getPoolName())
+ .setPath(path1).build(), EnumSet.of(CacheFlag.FORCE));
}
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java?rev=1552205&r1=1552204&r2=1552205&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java Thu Dec 19 02:03:47 2013
@@ -20,6 +20,9 @@ package org.apache.hadoop.hdfs.server.na
import static org.apache.hadoop.hdfs.server.common.Util.fileAsURI;
import static org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil.assertNNHasCheckpoints;
import static org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil.getNameNodeCurrentDirs;
+import static org.apache.hadoop.test.MetricsAsserts.assertCounterGt;
+import static org.apache.hadoop.test.MetricsAsserts.assertGaugeGt;
+import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
@@ -72,6 +75,7 @@ import org.apache.hadoop.hdfs.server.pro
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.hdfs.tools.DFSAdmin;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.GenericTestUtils.DelayAnswer;
import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
@@ -106,6 +110,7 @@ public class TestCheckpoint {
}
static final Log LOG = LogFactory.getLog(TestCheckpoint.class);
+ static final String NN_METRICS = "NameNodeActivity";
static final long seed = 0xDEADBEEFL;
static final int blockSize = 4096;
@@ -1048,6 +1053,14 @@ public class TestCheckpoint {
//
secondary = startSecondaryNameNode(conf);
secondary.doCheckpoint();
+
+ MetricsRecordBuilder rb = getMetrics(NN_METRICS);
+ assertCounterGt("GetImageNumOps", 0, rb);
+ assertCounterGt("GetEditNumOps", 0, rb);
+ assertCounterGt("PutImageNumOps", 0, rb);
+ assertGaugeGt("GetImageAvgTime", 0.0, rb);
+ assertGaugeGt("GetEditAvgTime", 0.0, rb);
+ assertGaugeGt("PutImageAvgTime", 0.0, rb);
} finally {
fileSys.close();
cleanup(secondary);
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java?rev=1552205&r1=1552204&r2=1552205&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java Thu Dec 19 02:03:47 2013
@@ -31,6 +31,7 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
+import java.util.HashMap;
import java.util.Map;
import java.util.SortedMap;
@@ -383,4 +384,33 @@ public class TestFSEditLogLoader {
assertTrue(!validation.hasCorruptHeader());
assertEquals(HdfsConstants.INVALID_TXID, validation.getEndTxId());
}
+
+ private static final Map<Byte, FSEditLogOpCodes> byteToEnum =
+ new HashMap<Byte, FSEditLogOpCodes>();
+ static {
+ for(FSEditLogOpCodes opCode : FSEditLogOpCodes.values()) {
+ byteToEnum.put(opCode.getOpCode(), opCode);
+ }
+ }
+
+ private static FSEditLogOpCodes fromByte(byte opCode) {
+ return byteToEnum.get(opCode);
+ }
+
+ @Test
+ public void testFSEditLogOpCodes() throws IOException {
+ //try all codes
+ for(FSEditLogOpCodes c : FSEditLogOpCodes.values()) {
+ final byte code = c.getOpCode();
+ assertEquals("c=" + c + ", code=" + code,
+ c, FSEditLogOpCodes.fromByte(code));
+ }
+
+ //try all byte values
+ for(int b = 0; b < (1 << Byte.SIZE); b++) {
+ final byte code = (byte)b;
+ assertEquals("b=" + b + ", code=" + code,
+ fromByte(code), FSEditLogOpCodes.fromByte(code));
+ }
+ }
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImageWithSnapshot.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImageWithSnapshot.java?rev=1552205&r1=1552204&r2=1552205&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImageWithSnapshot.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImageWithSnapshot.java Thu Dec 19 02:03:47 2013
@@ -41,8 +41,8 @@ import org.apache.hadoop.hdfs.client.Hdf
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectoryWithSnapshotFeature.DirectoryDiff;
import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectoryWithSnapshot.DirectoryDiff;
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper;
import org.apache.hadoop.hdfs.util.Canceler;
import org.apache.log4j.Level;
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java?rev=1552205&r1=1552204&r2=1552205&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java Thu Dec 19 02:03:47 2013
@@ -61,6 +61,7 @@ import org.apache.hadoop.hdfs.protocol.L
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
+import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Time;
import org.junit.Test;
import org.mockito.Mockito;
@@ -780,7 +781,7 @@ public class TestINodeFile {
}
System.out.println("Adding component " + DFSUtil.bytes2String(component));
dir = new INodeDirectory(++id, component, permstatus, 0);
- prev.addChild(dir, false, null, null);
+ prev.addChild(dir, false, null);
prev = dir;
}
return dir; // Last Inode in the chain
@@ -921,6 +922,7 @@ public class TestINodeFile {
public void testDotdotInodePath() throws Exception {
final Configuration conf = new Configuration();
MiniDFSCluster cluster = null;
+ DFSClient client = null;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
@@ -933,7 +935,7 @@ public class TestINodeFile {
long parentId = fsdir.getINode("/").getId();
String testPath = "/.reserved/.inodes/" + dirId + "/..";
- DFSClient client = new DFSClient(NameNode.getAddress(conf), conf);
+ client = new DFSClient(NameNode.getAddress(conf), conf);
HdfsFileStatus status = client.getFileInfo(testPath);
assertTrue(parentId == status.getFileId());
@@ -943,6 +945,7 @@ public class TestINodeFile {
assertTrue(parentId == status.getFileId());
} finally {
+ IOUtils.cleanup(LOG, client);
if (cluster != null) {
cluster.shutdown();
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestSnapshotPathINodes.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestSnapshotPathINodes.java?rev=1552205&r1=1552204&r2=1552205&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestSnapshotPathINodes.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestSnapshotPathINodes.java Thu Dec 19 02:03:47 2013
@@ -31,7 +31,6 @@ import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectoryWithSnapshot;
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
import org.junit.AfterClass;
import org.junit.Assert;
@@ -206,8 +205,7 @@ public class TestSnapshotPathINodes {
// Check the INode for file1 (snapshot file)
INode snapshotFileNode = inodes[inodes.length - 1];
assertINodeFile(snapshotFileNode, file1);
- assertTrue(snapshotFileNode.getParent() instanceof
- INodeDirectoryWithSnapshot);
+ assertTrue(snapshotFileNode.getParent().isWithSnapshot());
// Call getExistingPathINodes and request only one INode.
nodesInPath = INodesInPath.resolve(fsdir.rootDir, components, 1, false);