You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ra...@apache.org on 2018/01/24 07:32:26 UTC
[31/50] [abbrv] hadoop git commit: HDFS-12291: [SPS]: Provide a
mechanism to recursively iterate and satisfy storage policy of all the files
under the given dir. Contributed by Surendra Singh Lilhore.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a2808be4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
index 3375590..57e9f94 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
@@ -21,6 +21,9 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KE
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.slf4j.LoggerFactory.getLogger;
import java.io.FileNotFoundException;
import java.io.IOException;
@@ -61,8 +64,10 @@ import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
import org.junit.Assert;
import org.junit.Test;
+import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.slf4j.event.Level;
import com.google.common.base.Supplier;
@@ -71,6 +76,12 @@ import com.google.common.base.Supplier;
* moved and finding its suggested target locations to move.
*/
public class TestStoragePolicySatisfier {
+
+ {
+ GenericTestUtils.setLogLevel(
+ getLogger(FSTreeTraverser.class), Level.DEBUG);
+ }
+
private static final String ONE_SSD = "ONE_SSD";
private static final String COLD = "COLD";
private static final Logger LOG =
@@ -341,7 +352,9 @@ public class TestStoragePolicySatisfier {
// take no effect for the sub-dir's file in the directory.
DFSTestUtil.waitExpectedStorageType(
- subFile2, StorageType.DEFAULT, 3, 30000, dfs);
+ subFile2, StorageType.SSD, 1, 30000, dfs);
+ DFSTestUtil.waitExpectedStorageType(
+ subFile2, StorageType.DISK, 2, 30000, dfs);
} finally {
shutdownCluster();
}
@@ -1083,6 +1096,368 @@ public class TestStoragePolicySatisfier {
}
}
+ /**
+ * Test SPS for empty directory, xAttr should be removed.
+ */
+ @Test(timeout = 300000)
+ public void testSPSForEmptyDirectory() throws IOException, TimeoutException,
+ InterruptedException {
+ MiniDFSCluster cluster = null;
+ try {
+ Configuration conf = new Configuration();
+ conf.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
+ true);
+ cluster = new MiniDFSCluster.Builder(conf).build();
+ cluster.waitActive();
+ DistributedFileSystem fs = cluster.getFileSystem();
+ Path emptyDir = new Path("/emptyDir");
+ fs.mkdirs(emptyDir);
+ fs.satisfyStoragePolicy(emptyDir);
+ // Make sure satisfy xattr has been removed.
+ DFSTestUtil.waitForXattrRemoved("/emptyDir",
+ XATTR_SATISFY_STORAGE_POLICY, cluster.getNamesystem(), 30000);
+ } finally {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+ }
+
+ /**
+ * Test SPS for not exist directory.
+ */
+ @Test(timeout = 300000)
+ public void testSPSForNonExistDirectory() throws Exception {
+ MiniDFSCluster cluster = null;
+ try {
+ Configuration conf = new Configuration();
+ conf.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
+ true);
+ cluster = new MiniDFSCluster.Builder(conf).build();
+ cluster.waitActive();
+ DistributedFileSystem fs = cluster.getFileSystem();
+ Path emptyDir = new Path("/emptyDir");
+ try {
+ fs.satisfyStoragePolicy(emptyDir);
+ fail("FileNotFoundException should throw");
+ } catch (FileNotFoundException e) {
+ // nothing to do
+ }
+ } finally {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+ }
+
+ /**
+ * Test SPS for directory tree which doesn't have files.
+ */
+ @Test(timeout = 300000)
+ public void testSPSWithDirectoryTreeWithoutFile() throws Exception {
+ MiniDFSCluster cluster = null;
+ try {
+ Configuration conf = new Configuration();
+ conf.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
+ true);
+ cluster = new MiniDFSCluster.Builder(conf).build();
+ cluster.waitActive();
+ // Create directories
+ /*
+ * root
+ * |
+ * A--------C--------D
+ * |
+ * G----H----I
+ * |
+ * O
+ */
+ DistributedFileSystem fs = cluster.getFileSystem();
+ fs.mkdirs(new Path("/root/C/H/O"));
+ fs.mkdirs(new Path("/root/A"));
+ fs.mkdirs(new Path("/root/D"));
+ fs.mkdirs(new Path("/root/C/G"));
+ fs.mkdirs(new Path("/root/C/I"));
+ fs.satisfyStoragePolicy(new Path("/root"));
+ // Make sure satisfy xattr has been removed.
+ DFSTestUtil.waitForXattrRemoved("/root",
+ XATTR_SATISFY_STORAGE_POLICY, cluster.getNamesystem(), 30000);
+ } finally {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+ }
+
+ /**
+ * Test SPS for directory which has multilevel directories.
+ */
+ @Test(timeout = 300000)
+ public void testMultipleLevelDirectoryForSatisfyStoragePolicy()
+ throws Exception {
+ try {
+ StorageType[][] diskTypes = new StorageType[][] {
+ {StorageType.DISK, StorageType.ARCHIVE},
+ {StorageType.ARCHIVE, StorageType.SSD},
+ {StorageType.DISK, StorageType.DISK}};
+ config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
+ config.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
+ true);
+ hdfsCluster = startCluster(config, diskTypes, diskTypes.length,
+ storagesPerDatanode, capacity);
+ dfs = hdfsCluster.getFileSystem();
+ createDirectoryTree(dfs);
+
+ List<String> files = getDFSListOfTree();
+ dfs.setStoragePolicy(new Path("/root"), COLD);
+ dfs.satisfyStoragePolicy(new Path("/root"));
+ for (String fileName : files) {
+ // Wait till the block is moved to ARCHIVE
+ DFSTestUtil.waitExpectedStorageType(fileName, StorageType.ARCHIVE, 2,
+ 30000, dfs);
+ }
+ } finally {
+ shutdownCluster();
+ }
+ }
+
+ /**
+ * Test SPS for batch processing.
+ */
+ @Test(timeout = 300000)
+ public void testBatchProcessingForSPSDirectory() throws Exception {
+ try {
+ StorageType[][] diskTypes = new StorageType[][] {
+ {StorageType.DISK, StorageType.ARCHIVE},
+ {StorageType.ARCHIVE, StorageType.SSD},
+ {StorageType.DISK, StorageType.DISK}};
+ config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
+ config.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
+ true);
+ // Set queue max capacity
+ config.setInt(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY,
+ 5);
+ hdfsCluster = startCluster(config, diskTypes, diskTypes.length,
+ storagesPerDatanode, capacity);
+ dfs = hdfsCluster.getFileSystem();
+ createDirectoryTree(dfs);
+ List<String> files = getDFSListOfTree();
+ LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs(LogFactory
+ .getLog(FSTreeTraverser.class));
+
+ dfs.setStoragePolicy(new Path("/root"), COLD);
+ dfs.satisfyStoragePolicy(new Path("/root"));
+ for (String fileName : files) {
+ // Wait till the block is moved to ARCHIVE
+ DFSTestUtil.waitExpectedStorageType(fileName, StorageType.ARCHIVE, 2,
+ 30000, dfs);
+ }
+ waitForBlocksMovementResult(files.size(), 30000);
+ String expectedLogMessage = "StorageMovementNeeded queue remaining"
+ + " capacity is zero";
+ assertTrue("Log output does not contain expected log message: "
+ + expectedLogMessage, logs.getOutput().contains(expectedLogMessage));
+ } finally {
+ shutdownCluster();
+ }
+ }
+
+
+ /**
+ * Test traverse when parent got deleted.
+ * 1. Delete /root when traversing Q
+ * 2. U, R, S should not be in queued.
+ */
+ @Test
+ public void testTraverseWhenParentDeleted() throws Exception {
+ StorageType[][] diskTypes = new StorageType[][] {
+ {StorageType.DISK, StorageType.ARCHIVE},
+ {StorageType.ARCHIVE, StorageType.SSD},
+ {StorageType.DISK, StorageType.DISK}};
+ config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
+ hdfsCluster = startCluster(config, diskTypes, diskTypes.length,
+ storagesPerDatanode, capacity);
+ dfs = hdfsCluster.getFileSystem();
+ createDirectoryTree(dfs);
+
+ List<String> expectedTraverseOrder = getDFSListOfTree();
+
+ //Remove files which will not be traverse when parent is deleted
+ expectedTraverseOrder.remove("/root/D/L/R");
+ expectedTraverseOrder.remove("/root/D/L/S");
+ expectedTraverseOrder.remove("/root/D/L/Q/U");
+ FSDirectory fsDir = hdfsCluster.getNamesystem().getFSDirectory();
+
+ //Queue limit can control the traverse logic to wait for some free
+ //entry in queue. After 10 files, traverse control will be on U.
+ StoragePolicySatisfier sps = Mockito.mock(StoragePolicySatisfier.class);
+ Mockito.when(sps.isRunning()).thenReturn(true);
+ BlockStorageMovementNeeded movmentNeededQueue =
+ new BlockStorageMovementNeeded(fsDir.getFSNamesystem(), sps, 10);
+ INode rootINode = fsDir.getINode("/root");
+ movmentNeededQueue.addToPendingDirQueue(rootINode.getId());
+ movmentNeededQueue.init();
+
+ //Wait for thread to reach U.
+ Thread.sleep(1000);
+
+ dfs.delete(new Path("/root/D/L"), true);
+
+ // Remove 10 element and make queue free, So other traversing will start.
+ for (int i = 0; i < 10; i++) {
+ String path = expectedTraverseOrder.remove(0);
+ long trackId = movmentNeededQueue.get().getTrackId();
+ INode inode = fsDir.getInode(trackId);
+ assertTrue("Failed to traverse tree, expected " + path + " but got "
+ + inode.getFullPathName(), path.equals(inode.getFullPathName()));
+ }
+ //Wait to finish tree traverse
+ Thread.sleep(5000);
+
+ // Check other element traversed in order and R,S should not be added in
+ // queue which we already removed from expected list
+ for (String path : expectedTraverseOrder) {
+ long trackId = movmentNeededQueue.get().getTrackId();
+ INode inode = fsDir.getInode(trackId);
+ assertTrue("Failed to traverse tree, expected " + path + " but got "
+ + inode.getFullPathName(), path.equals(inode.getFullPathName()));
+ }
+ dfs.delete(new Path("/root"), true);
+ }
+
+ /**
+ * Test traverse when root parent got deleted.
+ * 1. Delete L when traversing Q
+ * 2. E, M, U, R, S should not be in queued.
+ */
+ @Test
+ public void testTraverseWhenRootParentDeleted() throws Exception {
+ StorageType[][] diskTypes = new StorageType[][] {
+ {StorageType.DISK, StorageType.ARCHIVE},
+ {StorageType.ARCHIVE, StorageType.SSD},
+ {StorageType.DISK, StorageType.DISK}};
+ config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
+ hdfsCluster = startCluster(config, diskTypes, diskTypes.length,
+ storagesPerDatanode, capacity);
+ dfs = hdfsCluster.getFileSystem();
+ createDirectoryTree(dfs);
+
+ List<String> expectedTraverseOrder = getDFSListOfTree();
+
+ // Remove files which will not be traverse when parent is deleted
+ expectedTraverseOrder.remove("/root/D/L/R");
+ expectedTraverseOrder.remove("/root/D/L/S");
+ expectedTraverseOrder.remove("/root/D/L/Q/U");
+ expectedTraverseOrder.remove("/root/D/M");
+ expectedTraverseOrder.remove("/root/E");
+ FSDirectory fsDir = hdfsCluster.getNamesystem().getFSDirectory();
+ StoragePolicySatisfier sps = Mockito.mock(StoragePolicySatisfier.class);
+ Mockito.when(sps.isRunning()).thenReturn(true);
+ // Queue limit can control the traverse logic to wait for some free
+ // entry in queue. After 10 files, traverse control will be on U.
+ BlockStorageMovementNeeded movmentNeededQueue =
+ new BlockStorageMovementNeeded(fsDir.getFSNamesystem(), sps, 10);
+ movmentNeededQueue.init();
+ INode rootINode = fsDir.getINode("/root");
+ movmentNeededQueue.addToPendingDirQueue(rootINode.getId());
+ // Wait for thread to reach U.
+ Thread.sleep(1000);
+
+ dfs.delete(new Path("/root/D/L"), true);
+
+ // Remove 10 element and make queue free, So other traversing will start.
+ for (int i = 0; i < 10; i++) {
+ String path = expectedTraverseOrder.remove(0);
+ long trackId = movmentNeededQueue.get().getTrackId();
+ INode inode = fsDir.getInode(trackId);
+ assertTrue("Failed to traverse tree, expected " + path + " but got "
+ + inode.getFullPathName(), path.equals(inode.getFullPathName()));
+ }
+ // Wait to finish tree traverse
+ Thread.sleep(5000);
+
+ // Check other element traversed in order and E, M, U, R, S should not be
+ // added in queue which we already removed from expected list
+ for (String path : expectedTraverseOrder) {
+ long trackId = movmentNeededQueue.get().getTrackId();
+ INode inode = fsDir.getInode(trackId);
+ assertTrue("Failed to traverse tree, expected " + path + " but got "
+ + inode.getFullPathName(), path.equals(inode.getFullPathName()));
+ }
+ dfs.delete(new Path("/root"), true);
+ }
+
+ private static void createDirectoryTree(DistributedFileSystem dfs)
+ throws Exception {
+ // tree structure
+ /*
+ * root
+ * |
+ * A--------B--------C--------D--------E
+ * | |
+ * F----G----H----I J----K----L----M
+ * | |
+ * N----O----P Q----R----S
+ * | |
+ * T U
+ */
+ // create root Node and child
+ dfs.mkdirs(new Path("/root"));
+ DFSTestUtil.createFile(dfs, new Path("/root/A"), 1024, (short) 3, 0);
+ dfs.mkdirs(new Path("/root/B"));
+ DFSTestUtil.createFile(dfs, new Path("/root/C"), 1024, (short) 3, 0);
+ dfs.mkdirs(new Path("/root/D"));
+ DFSTestUtil.createFile(dfs, new Path("/root/E"), 1024, (short) 3, 0);
+
+ // Create /root/B child
+ DFSTestUtil.createFile(dfs, new Path("/root/B/F"), 1024, (short) 3, 0);
+ dfs.mkdirs(new Path("/root/B/G"));
+ DFSTestUtil.createFile(dfs, new Path("/root/B/H"), 1024, (short) 3, 0);
+ DFSTestUtil.createFile(dfs, new Path("/root/B/I"), 1024, (short) 3, 0);
+
+ // Create /root/D child
+ DFSTestUtil.createFile(dfs, new Path("/root/D/J"), 1024, (short) 3, 0);
+ DFSTestUtil.createFile(dfs, new Path("/root/D/K"), 1024, (short) 3, 0);
+ dfs.mkdirs(new Path("/root/D/L"));
+ DFSTestUtil.createFile(dfs, new Path("/root/D/M"), 1024, (short) 3, 0);
+
+ // Create /root/B/G child
+ DFSTestUtil.createFile(dfs, new Path("/root/B/G/N"), 1024, (short) 3, 0);
+ DFSTestUtil.createFile(dfs, new Path("/root/B/G/O"), 1024, (short) 3, 0);
+ dfs.mkdirs(new Path("/root/B/G/P"));
+
+ // Create /root/D/L child
+ dfs.mkdirs(new Path("/root/D/L/Q"));
+ DFSTestUtil.createFile(dfs, new Path("/root/D/L/R"), 1024, (short) 3, 0);
+ DFSTestUtil.createFile(dfs, new Path("/root/D/L/S"), 1024, (short) 3, 0);
+
+ // Create /root/B/G/P child
+ DFSTestUtil.createFile(dfs, new Path("/root/B/G/P/T"), 1024, (short) 3, 0);
+
+ // Create /root/D/L/Q child
+ DFSTestUtil.createFile(dfs, new Path("/root/D/L/Q/U"), 1024, (short) 3, 0);
+ }
+
+ private List<String> getDFSListOfTree() {
+ List<String> dfsList = new ArrayList<>();
+ dfsList.add("/root/A");
+ dfsList.add("/root/B/F");
+ dfsList.add("/root/B/G/N");
+ dfsList.add("/root/B/G/O");
+ dfsList.add("/root/B/G/P/T");
+ dfsList.add("/root/B/H");
+ dfsList.add("/root/B/I");
+ dfsList.add("/root/C");
+ dfsList.add("/root/D/J");
+ dfsList.add("/root/D/K");
+ dfsList.add("/root/D/L/Q/U");
+ dfsList.add("/root/D/L/R");
+ dfsList.add("/root/D/L/S");
+ dfsList.add("/root/D/M");
+ dfsList.add("/root/E");
+ return dfsList;
+ }
+
private String createFileAndSimulateFavoredNodes(int favoredNodesCount)
throws IOException {
ArrayList<DataNode> dns = hdfsCluster.getDataNodes();
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org