You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ra...@apache.org on 2018/01/29 04:12:32 UTC

[27/50] [abbrv] hadoop git commit: HDFS-12291: [SPS]: Provide a mechanism to recursively iterate and satisfy storage policy of all the files under the given dir. Contributed by Surendra Singh Lilhore.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e94f1deb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
index 3375590..57e9f94 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
@@ -21,6 +21,9 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KE
 import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.slf4j.LoggerFactory.getLogger;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -61,8 +64,10 @@ import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
 import org.junit.Assert;
 import org.junit.Test;
+import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.slf4j.event.Level;
 
 import com.google.common.base.Supplier;
 
@@ -71,6 +76,12 @@ import com.google.common.base.Supplier;
  * moved and finding its suggested target locations to move.
  */
 public class TestStoragePolicySatisfier {
+
+  {
+    GenericTestUtils.setLogLevel(
+        getLogger(FSTreeTraverser.class), Level.DEBUG);
+  }
+
   private static final String ONE_SSD = "ONE_SSD";
   private static final String COLD = "COLD";
   private static final Logger LOG =
@@ -341,7 +352,9 @@ public class TestStoragePolicySatisfier {
 
       // take no effect for the sub-dir's file in the directory.
       DFSTestUtil.waitExpectedStorageType(
-          subFile2, StorageType.DEFAULT, 3, 30000, dfs);
+          subFile2, StorageType.SSD, 1, 30000, dfs);
+      DFSTestUtil.waitExpectedStorageType(
+          subFile2, StorageType.DISK, 2, 30000, dfs);
     } finally {
       shutdownCluster();
     }
@@ -1083,6 +1096,368 @@ public class TestStoragePolicySatisfier {
     }
   }
 
+  /**
+   * Test SPS for empty directory, xAttr should be removed.
+   */
+  @Test(timeout = 300000)
+  public void testSPSForEmptyDirectory() throws IOException, TimeoutException,
+      InterruptedException {
+    MiniDFSCluster cluster = null;
+    try {
+      Configuration conf = new Configuration();
+      conf.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
+          true);
+      cluster = new MiniDFSCluster.Builder(conf).build();
+      cluster.waitActive();
+      DistributedFileSystem fs = cluster.getFileSystem();
+      Path emptyDir = new Path("/emptyDir");
+      fs.mkdirs(emptyDir);
+      fs.satisfyStoragePolicy(emptyDir);
+      // Make sure satisfy xattr has been removed.
+      DFSTestUtil.waitForXattrRemoved("/emptyDir",
+          XATTR_SATISFY_STORAGE_POLICY, cluster.getNamesystem(), 30000);
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
+  /**
+   * Test SPS for not exist directory.
+   */
+  @Test(timeout = 300000)
+  public void testSPSForNonExistDirectory() throws Exception {
+    MiniDFSCluster cluster = null;
+    try {
+      Configuration conf = new Configuration();
+      conf.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
+          true);
+      cluster = new MiniDFSCluster.Builder(conf).build();
+      cluster.waitActive();
+      DistributedFileSystem fs = cluster.getFileSystem();
+      Path emptyDir = new Path("/emptyDir");
+      try {
+        fs.satisfyStoragePolicy(emptyDir);
+        fail("FileNotFoundException should throw");
+      } catch (FileNotFoundException e) {
+        // nothing to do
+      }
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
+  /**
+   * Test SPS for directory tree which doesn't have files.
+   */
+  @Test(timeout = 300000)
+  public void testSPSWithDirectoryTreeWithoutFile() throws Exception {
+    MiniDFSCluster cluster = null;
+    try {
+      Configuration conf = new Configuration();
+      conf.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
+          true);
+      cluster = new MiniDFSCluster.Builder(conf).build();
+      cluster.waitActive();
+      // Create directories
+      /*
+       *                   root
+       *                    |
+       *           A--------C--------D
+       *                    |
+       *               G----H----I
+       *                    |
+       *                    O
+       */
+      DistributedFileSystem fs = cluster.getFileSystem();
+      fs.mkdirs(new Path("/root/C/H/O"));
+      fs.mkdirs(new Path("/root/A"));
+      fs.mkdirs(new Path("/root/D"));
+      fs.mkdirs(new Path("/root/C/G"));
+      fs.mkdirs(new Path("/root/C/I"));
+      fs.satisfyStoragePolicy(new Path("/root"));
+      // Make sure satisfy xattr has been removed.
+      DFSTestUtil.waitForXattrRemoved("/root",
+          XATTR_SATISFY_STORAGE_POLICY, cluster.getNamesystem(), 30000);
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
+  /**
+   * Test SPS for directory which has multilevel directories.
+   */
+  @Test(timeout = 300000)
+  public void testMultipleLevelDirectoryForSatisfyStoragePolicy()
+      throws Exception {
+    try {
+      StorageType[][] diskTypes = new StorageType[][] {
+          {StorageType.DISK, StorageType.ARCHIVE},
+          {StorageType.ARCHIVE, StorageType.SSD},
+          {StorageType.DISK, StorageType.DISK}};
+      config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
+      config.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
+          true);
+      hdfsCluster = startCluster(config, diskTypes, diskTypes.length,
+          storagesPerDatanode, capacity);
+      dfs = hdfsCluster.getFileSystem();
+      createDirectoryTree(dfs);
+
+      List<String> files = getDFSListOfTree();
+      dfs.setStoragePolicy(new Path("/root"), COLD);
+      dfs.satisfyStoragePolicy(new Path("/root"));
+      for (String fileName : files) {
+        // Wait till the block is moved to ARCHIVE
+        DFSTestUtil.waitExpectedStorageType(fileName, StorageType.ARCHIVE, 2,
+            30000, dfs);
+      }
+    } finally {
+      shutdownCluster();
+    }
+  }
+
+  /**
+   * Test SPS for batch processing.
+   */
+  @Test(timeout = 300000)
+  public void testBatchProcessingForSPSDirectory() throws Exception {
+    try {
+      StorageType[][] diskTypes = new StorageType[][] {
+          {StorageType.DISK, StorageType.ARCHIVE},
+          {StorageType.ARCHIVE, StorageType.SSD},
+          {StorageType.DISK, StorageType.DISK}};
+      config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
+      config.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
+          true);
+      // Set queue max capacity
+      config.setInt(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY,
+          5);
+      hdfsCluster = startCluster(config, diskTypes, diskTypes.length,
+          storagesPerDatanode, capacity);
+      dfs = hdfsCluster.getFileSystem();
+      createDirectoryTree(dfs);
+      List<String> files = getDFSListOfTree();
+      LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs(LogFactory
+          .getLog(FSTreeTraverser.class));
+
+      dfs.setStoragePolicy(new Path("/root"), COLD);
+      dfs.satisfyStoragePolicy(new Path("/root"));
+      for (String fileName : files) {
+        // Wait till the block is moved to ARCHIVE
+        DFSTestUtil.waitExpectedStorageType(fileName, StorageType.ARCHIVE, 2,
+            30000, dfs);
+      }
+      waitForBlocksMovementResult(files.size(), 30000);
+      String expectedLogMessage = "StorageMovementNeeded queue remaining"
+          + " capacity is zero";
+      assertTrue("Log output does not contain expected log message: "
+          + expectedLogMessage, logs.getOutput().contains(expectedLogMessage));
+    } finally {
+      shutdownCluster();
+    }
+  }
+
+
+  /**
+   *  Test traverse when parent got deleted.
+   *  1. Delete /root when traversing Q
+   *  2. U, R, S should not be in queued.
+   */
+  @Test
+  public void testTraverseWhenParentDeleted() throws Exception {
+    StorageType[][] diskTypes = new StorageType[][] {
+        {StorageType.DISK, StorageType.ARCHIVE},
+        {StorageType.ARCHIVE, StorageType.SSD},
+        {StorageType.DISK, StorageType.DISK}};
+    config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
+    hdfsCluster = startCluster(config, diskTypes, diskTypes.length,
+        storagesPerDatanode, capacity);
+    dfs = hdfsCluster.getFileSystem();
+    createDirectoryTree(dfs);
+
+    List<String> expectedTraverseOrder = getDFSListOfTree();
+
+    //Remove files which will not be traverse when parent is deleted
+    expectedTraverseOrder.remove("/root/D/L/R");
+    expectedTraverseOrder.remove("/root/D/L/S");
+    expectedTraverseOrder.remove("/root/D/L/Q/U");
+    FSDirectory fsDir = hdfsCluster.getNamesystem().getFSDirectory();
+
+    //Queue limit can control the traverse logic to wait for some free
+    //entry in queue. After 10 files, traverse control will be on U.
+    StoragePolicySatisfier sps = Mockito.mock(StoragePolicySatisfier.class);
+    Mockito.when(sps.isRunning()).thenReturn(true);
+    BlockStorageMovementNeeded movmentNeededQueue =
+        new BlockStorageMovementNeeded(fsDir.getFSNamesystem(), sps, 10);
+    INode rootINode = fsDir.getINode("/root");
+    movmentNeededQueue.addToPendingDirQueue(rootINode.getId());
+    movmentNeededQueue.init();
+
+    //Wait for thread to reach U.
+    Thread.sleep(1000);
+
+    dfs.delete(new Path("/root/D/L"), true);
+
+    // Remove 10 element and make queue free, So other traversing will start.
+    for (int i = 0; i < 10; i++) {
+      String path = expectedTraverseOrder.remove(0);
+      long trackId = movmentNeededQueue.get().getTrackId();
+      INode inode = fsDir.getInode(trackId);
+      assertTrue("Failed to traverse tree, expected " + path + " but got "
+          + inode.getFullPathName(), path.equals(inode.getFullPathName()));
+    }
+    //Wait to finish tree traverse
+    Thread.sleep(5000);
+
+    // Check other element traversed in order and R,S should not be added in
+    // queue which we already removed from expected list
+    for (String path : expectedTraverseOrder) {
+      long trackId = movmentNeededQueue.get().getTrackId();
+      INode inode = fsDir.getInode(trackId);
+      assertTrue("Failed to traverse tree, expected " + path + " but got "
+          + inode.getFullPathName(), path.equals(inode.getFullPathName()));
+    }
+    dfs.delete(new Path("/root"), true);
+  }
+
+  /**
+   *  Test traverse when root parent got deleted.
+   *  1. Delete L when traversing Q
+   *  2. E, M, U, R, S should not be in queued.
+   */
+  @Test
+  public void testTraverseWhenRootParentDeleted() throws Exception {
+    StorageType[][] diskTypes = new StorageType[][] {
+        {StorageType.DISK, StorageType.ARCHIVE},
+        {StorageType.ARCHIVE, StorageType.SSD},
+        {StorageType.DISK, StorageType.DISK}};
+    config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
+    hdfsCluster = startCluster(config, diskTypes, diskTypes.length,
+        storagesPerDatanode, capacity);
+    dfs = hdfsCluster.getFileSystem();
+    createDirectoryTree(dfs);
+
+    List<String> expectedTraverseOrder = getDFSListOfTree();
+
+    // Remove files which will not be traverse when parent is deleted
+    expectedTraverseOrder.remove("/root/D/L/R");
+    expectedTraverseOrder.remove("/root/D/L/S");
+    expectedTraverseOrder.remove("/root/D/L/Q/U");
+    expectedTraverseOrder.remove("/root/D/M");
+    expectedTraverseOrder.remove("/root/E");
+    FSDirectory fsDir = hdfsCluster.getNamesystem().getFSDirectory();
+    StoragePolicySatisfier sps = Mockito.mock(StoragePolicySatisfier.class);
+    Mockito.when(sps.isRunning()).thenReturn(true);
+    // Queue limit can control the traverse logic to wait for some free
+    // entry in queue. After 10 files, traverse control will be on U.
+    BlockStorageMovementNeeded movmentNeededQueue =
+        new BlockStorageMovementNeeded(fsDir.getFSNamesystem(), sps, 10);
+    movmentNeededQueue.init();
+    INode rootINode = fsDir.getINode("/root");
+    movmentNeededQueue.addToPendingDirQueue(rootINode.getId());
+    // Wait for thread to reach U.
+    Thread.sleep(1000);
+
+    dfs.delete(new Path("/root/D/L"), true);
+
+    // Remove 10 element and make queue free, So other traversing will start.
+    for (int i = 0; i < 10; i++) {
+      String path = expectedTraverseOrder.remove(0);
+      long trackId = movmentNeededQueue.get().getTrackId();
+      INode inode = fsDir.getInode(trackId);
+      assertTrue("Failed to traverse tree, expected " + path + " but got "
+          + inode.getFullPathName(), path.equals(inode.getFullPathName()));
+    }
+    // Wait to finish tree traverse
+    Thread.sleep(5000);
+
+    // Check other element traversed in order and E, M, U, R, S should not be
+    // added in queue which we already removed from expected list
+    for (String path : expectedTraverseOrder) {
+      long trackId = movmentNeededQueue.get().getTrackId();
+      INode inode = fsDir.getInode(trackId);
+      assertTrue("Failed to traverse tree, expected " + path + " but got "
+          + inode.getFullPathName(), path.equals(inode.getFullPathName()));
+    }
+    dfs.delete(new Path("/root"), true);
+  }
+
+  private static void createDirectoryTree(DistributedFileSystem dfs)
+      throws Exception {
+    // tree structure
+    /*
+     *                           root
+     *                             |
+     *           A--------B--------C--------D--------E
+     *                    |                 |
+     *          F----G----H----I       J----K----L----M
+     *               |                           |
+     *          N----O----P                 Q----R----S
+     *                    |                 |
+     *                    T                 U
+     */
+    // create root Node and child
+    dfs.mkdirs(new Path("/root"));
+    DFSTestUtil.createFile(dfs, new Path("/root/A"), 1024, (short) 3, 0);
+    dfs.mkdirs(new Path("/root/B"));
+    DFSTestUtil.createFile(dfs, new Path("/root/C"), 1024, (short) 3, 0);
+    dfs.mkdirs(new Path("/root/D"));
+    DFSTestUtil.createFile(dfs, new Path("/root/E"), 1024, (short) 3, 0);
+
+    // Create /root/B child
+    DFSTestUtil.createFile(dfs, new Path("/root/B/F"), 1024, (short) 3, 0);
+    dfs.mkdirs(new Path("/root/B/G"));
+    DFSTestUtil.createFile(dfs, new Path("/root/B/H"), 1024, (short) 3, 0);
+    DFSTestUtil.createFile(dfs, new Path("/root/B/I"), 1024, (short) 3, 0);
+
+    // Create /root/D child
+    DFSTestUtil.createFile(dfs, new Path("/root/D/J"), 1024, (short) 3, 0);
+    DFSTestUtil.createFile(dfs, new Path("/root/D/K"), 1024, (short) 3, 0);
+    dfs.mkdirs(new Path("/root/D/L"));
+    DFSTestUtil.createFile(dfs, new Path("/root/D/M"), 1024, (short) 3, 0);
+
+    // Create /root/B/G child
+    DFSTestUtil.createFile(dfs, new Path("/root/B/G/N"), 1024, (short) 3, 0);
+    DFSTestUtil.createFile(dfs, new Path("/root/B/G/O"), 1024, (short) 3, 0);
+    dfs.mkdirs(new Path("/root/B/G/P"));
+
+    // Create /root/D/L child
+    dfs.mkdirs(new Path("/root/D/L/Q"));
+    DFSTestUtil.createFile(dfs, new Path("/root/D/L/R"), 1024, (short) 3, 0);
+    DFSTestUtil.createFile(dfs, new Path("/root/D/L/S"), 1024, (short) 3, 0);
+
+    // Create /root/B/G/P child
+    DFSTestUtil.createFile(dfs, new Path("/root/B/G/P/T"), 1024, (short) 3, 0);
+
+    // Create /root/D/L/Q child
+    DFSTestUtil.createFile(dfs, new Path("/root/D/L/Q/U"), 1024, (short) 3, 0);
+  }
+
+  private List<String> getDFSListOfTree() {
+    List<String> dfsList = new ArrayList<>();
+    dfsList.add("/root/A");
+    dfsList.add("/root/B/F");
+    dfsList.add("/root/B/G/N");
+    dfsList.add("/root/B/G/O");
+    dfsList.add("/root/B/G/P/T");
+    dfsList.add("/root/B/H");
+    dfsList.add("/root/B/I");
+    dfsList.add("/root/C");
+    dfsList.add("/root/D/J");
+    dfsList.add("/root/D/K");
+    dfsList.add("/root/D/L/Q/U");
+    dfsList.add("/root/D/L/R");
+    dfsList.add("/root/D/L/S");
+    dfsList.add("/root/D/M");
+    dfsList.add("/root/E");
+    return dfsList;
+  }
+
   private String createFileAndSimulateFavoredNodes(int favoredNodesCount)
       throws IOException {
     ArrayList<DataNode> dns = hdfsCluster.getDataNodes();


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org