You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ra...@apache.org on 2018/01/29 04:12:34 UTC

[29/50] [abbrv] hadoop git commit: HDFS-12570: [SPS]: Refactor Co-ordinator datanode logic to track the block storage movements. Contributed by Rakesh R.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/776af85f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
index 57e9f94..70219f6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
@@ -203,11 +203,11 @@ public class TestStoragePolicySatisfier {
   }
 
   /**
-   * Tests to verify that the block storage movement results will be propagated
+   * Tests to verify that the block storage movement report will be propagated
    * to Namenode via datanode heartbeat.
    */
   @Test(timeout = 300000)
-  public void testPerTrackIdBlocksStorageMovementResults() throws Exception {
+  public void testBlksStorageMovementAttemptFinishedReport() throws Exception {
     try {
       createCluster();
       // Change policy to ONE_SSD
@@ -229,7 +229,7 @@ public class TestStoragePolicySatisfier {
       DFSTestUtil.waitExpectedStorageType(
           file, StorageType.DISK, 2, 30000, dfs);
 
-      waitForBlocksMovementResult(1, 30000);
+      waitForBlocksMovementAttemptReport(1, 30000);
     } finally {
       shutdownCluster();
     }
@@ -276,7 +276,7 @@ public class TestStoragePolicySatisfier {
             fileName, StorageType.DISK, 2, 30000, dfs);
       }
 
-      waitForBlocksMovementResult(files.size(), 30000);
+      waitForBlocksMovementAttemptReport(files.size(), 30000);
     } finally {
       shutdownCluster();
     }
@@ -457,7 +457,7 @@ public class TestStoragePolicySatisfier {
       DFSTestUtil.waitExpectedStorageType(
           file, StorageType.DISK, 2, 30000, dfs);
 
-      waitForBlocksMovementResult(1, 30000);
+      waitForBlocksMovementAttemptReport(1, 30000);
     } finally {
       shutdownCluster();
     }
@@ -630,7 +630,7 @@ public class TestStoragePolicySatisfier {
       // No block movement will be scheduled as there is no target node
       // available with the required storage type.
       waitForAttemptedItems(1, 30000);
-      waitForBlocksMovementResult(1, 30000);
+      waitForBlocksMovementAttemptReport(1, 30000);
       DFSTestUtil.waitExpectedStorageType(
           file1, StorageType.ARCHIVE, 1, 30000, dfs);
       DFSTestUtil.waitExpectedStorageType(
@@ -691,7 +691,7 @@ public class TestStoragePolicySatisfier {
       DFSTestUtil.waitExpectedStorageType(
           file, StorageType.DISK, 3, 30000, dfs);
 
-      waitForBlocksMovementResult(1, 30000);
+      waitForBlocksMovementAttemptReport(1, 30000);
     } finally {
       shutdownCluster();
     }
@@ -871,7 +871,7 @@ public class TestStoragePolicySatisfier {
       Set<DatanodeDescriptor> dns = hdfsCluster.getNamesystem()
           .getBlockManager().getDatanodeManager().getDatanodes();
       for (DatanodeDescriptor dd : dns) {
-        assertNull(dd.getBlocksToMoveStorages());
+        assertNull(dd.getBlocksToMoveStorages(1));
       }
 
       // Enable heart beats now
@@ -1224,7 +1224,7 @@ public class TestStoragePolicySatisfier {
   /**
    * Test SPS for batch processing.
    */
-  @Test(timeout = 300000)
+  @Test(timeout = 3000000)
   public void testBatchProcessingForSPSDirectory() throws Exception {
     try {
       StorageType[][] diskTypes = new StorageType[][] {
@@ -1252,7 +1252,7 @@ public class TestStoragePolicySatisfier {
         DFSTestUtil.waitExpectedStorageType(fileName, StorageType.ARCHIVE, 2,
             30000, dfs);
       }
-      waitForBlocksMovementResult(files.size(), 30000);
+      waitForBlocksMovementAttemptReport(files.size(), 30000);
       String expectedLogMessage = "StorageMovementNeeded queue remaining"
           + " capacity is zero";
       assertTrue("Log output does not contain expected log message: "
@@ -1268,7 +1268,7 @@ public class TestStoragePolicySatisfier {
    *  1. Delete /root when traversing Q
    *  2. U, R, S should not be in queued.
    */
-  @Test
+  @Test(timeout = 300000)
   public void testTraverseWhenParentDeleted() throws Exception {
     StorageType[][] diskTypes = new StorageType[][] {
         {StorageType.DISK, StorageType.ARCHIVE},
@@ -1330,7 +1330,7 @@ public class TestStoragePolicySatisfier {
    *  1. Delete L when traversing Q
    *  2. E, M, U, R, S should not be in queued.
    */
-  @Test
+  @Test(timeout = 300000)
   public void testTraverseWhenRootParentDeleted() throws Exception {
     StorageType[][] diskTypes = new StorageType[][] {
         {StorageType.DISK, StorageType.ARCHIVE},
@@ -1387,6 +1387,82 @@ public class TestStoragePolicySatisfier {
     dfs.delete(new Path("/root"), true);
   }
 
+  /**
+   * Test storage move blocks while under replication block tasks exists in the
+   * system. So, both will share the max transfer streams.
+   *
+   * 1. Create cluster with 3 datanode.
+   * 2. Create 20 files with 2 replica.
+   * 3. Start 2 more DNs with DISK & SSD types
+   * 4. SetReplication factor for the 1st 10 files to 4 to trigger replica task
+   * 5. Set policy to SSD to the 2nd set of files from 11-20
+   * 6. Call SPS for 11-20 files to trigger move block tasks to new DNs
+   * 7. Wait for the under replica and SPS tasks completion
+   */
+  @Test(timeout = 300000)
+  public void testMoveBlocksWithUnderReplicatedBlocks() throws Exception {
+    try {
+      config.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 3);
+      config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
+      config.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
+          true);
+      config.set(DFSConfigKeys
+          .DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY,
+          "3000");
+      config.setBoolean(DFSConfigKeys
+          .DFS_STORAGE_POLICY_SATISFIER_SHARE_EQUAL_REPLICA_MAX_STREAMS_KEY,
+          true);
+
+      StorageType[][] storagetypes = new StorageType[][] {
+          {StorageType.ARCHIVE, StorageType.DISK},
+          {StorageType.ARCHIVE, StorageType.DISK}};
+      hdfsCluster = new MiniDFSCluster.Builder(config).numDataNodes(2)
+          .storageTypes(storagetypes).build();
+      hdfsCluster.waitActive();
+      dfs = hdfsCluster.getFileSystem();
+
+      // Below files will be used for pending replication block tasks.
+      for (int i=1; i<=20; i++){
+        Path filePath = new Path("/file" + i);
+        DFSTestUtil.createFile(dfs, filePath, DEFAULT_BLOCK_SIZE * 5, (short) 2,
+            0);
+      }
+
+      StorageType[][] newtypes =
+          new StorageType[][]{{StorageType.DISK, StorageType.SSD},
+              {StorageType.DISK, StorageType.SSD}};
+      startAdditionalDNs(config, 2, numOfDatanodes, newtypes,
+          storagesPerDatanode, capacity, hdfsCluster);
+
+      // increase replication factor to 4 for the first 10 files and thus
+      // initiate replica tasks
+      for (int i=1; i<=10; i++){
+        Path filePath = new Path("/file" + i);
+        dfs.setReplication(filePath, (short) 4);
+      }
+
+      // invoke SPS for 11-20 files
+      for (int i = 11; i <= 20; i++) {
+        Path filePath = new Path("/file" + i);
+        dfs.setStoragePolicy(filePath, "ALL_SSD");
+        dfs.satisfyStoragePolicy(filePath);
+      }
+
+      for (int i = 1; i <= 10; i++) {
+        Path filePath = new Path("/file" + i);
+        DFSTestUtil.waitExpectedStorageType(filePath.toString(),
+            StorageType.DISK, 4, 30000, hdfsCluster.getFileSystem());
+      }
+      for (int i = 11; i <= 20; i++) {
+        Path filePath = new Path("/file" + i);
+        DFSTestUtil.waitExpectedStorageType(filePath.toString(),
+            StorageType.SSD, 2, 30000, hdfsCluster.getFileSystem());
+      }
+    } finally {
+      shutdownCluster();
+    }
+  }
+
   private static void createDirectoryTree(DistributedFileSystem dfs)
       throws Exception {
     // tree structure
@@ -1514,18 +1590,19 @@ public class TestStoragePolicySatisfier {
     }, 100, timeout);
   }
 
-  private void waitForBlocksMovementResult(long expectedBlkMovResultsCount,
-      int timeout) throws TimeoutException, InterruptedException {
+  private void waitForBlocksMovementAttemptReport(
+      long expectedMovementFinishedBlocksCount, int timeout)
+          throws TimeoutException, InterruptedException {
     BlockManager blockManager = hdfsCluster.getNamesystem().getBlockManager();
     final StoragePolicySatisfier sps = blockManager.getStoragePolicySatisfier();
     GenericTestUtils.waitFor(new Supplier<Boolean>() {
       @Override
       public Boolean get() {
-        LOG.info("expectedResultsCount={} actualResultsCount={}",
-            expectedBlkMovResultsCount,
-            sps.getAttemptedItemsMonitor().resultsCount());
-        return sps.getAttemptedItemsMonitor()
-            .resultsCount() == expectedBlkMovResultsCount;
+        LOG.info("MovementFinishedBlocks: expectedCount={} actualCount={}",
+            expectedMovementFinishedBlocksCount,
+            sps.getAttemptedItemsMonitor().getMovementFinishedBlocksCount());
+        return sps.getAttemptedItemsMonitor().getMovementFinishedBlocksCount()
+            >= expectedMovementFinishedBlocksCount;
       }
     }, 100, timeout);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/776af85f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfierWithStripedFile.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfierWithStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfierWithStripedFile.java
index fc5d0a5..154ddae 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfierWithStripedFile.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfierWithStripedFile.java
@@ -180,7 +180,7 @@ public class TestStoragePolicySatisfierWithStripedFile {
       LOG.info("Sets storage policy to COLD and invoked satisfyStoragePolicy");
       cluster.triggerHeartbeats();
 
-      waitForBlocksMovementResult(cluster, 1, 60000);
+      waitForBlocksMovementAttemptReport(cluster, 9, 60000);
       // verify storage types and locations
       waitExpectedStorageType(cluster, fooFile, fileLen, StorageType.ARCHIVE, 9,
           9, 60000);
@@ -290,7 +290,7 @@ public class TestStoragePolicySatisfierWithStripedFile {
       LOG.info("Sets storage policy to COLD and invoked satisfyStoragePolicy");
       cluster.triggerHeartbeats();
 
-      waitForBlocksMovementResult(cluster, 1, 60000);
+      waitForBlocksMovementAttemptReport(cluster, 5, 60000);
       waitForAttemptedItems(cluster, 1, 30000);
       // verify storage types and locations.
       waitExpectedStorageType(cluster, fooFile, fileLen, StorageType.ARCHIVE, 5,
@@ -556,10 +556,10 @@ public class TestStoragePolicySatisfierWithStripedFile {
     }, 100, timeout);
   }
 
-  // Check whether the block movement result has been arrived at the
+  // Check whether the block movement attempt report has been arrived at the
   // Namenode(SPS).
-  private void waitForBlocksMovementResult(MiniDFSCluster cluster,
-      long expectedBlkMovResultsCount, int timeout)
+  private void waitForBlocksMovementAttemptReport(MiniDFSCluster cluster,
+      long expectedMovementFinishedBlocksCount, int timeout)
           throws TimeoutException, InterruptedException {
     BlockManager blockManager = cluster.getNamesystem().getBlockManager();
     final StoragePolicySatisfier sps = blockManager.getStoragePolicySatisfier();
@@ -568,11 +568,11 @@ public class TestStoragePolicySatisfierWithStripedFile {
     GenericTestUtils.waitFor(new Supplier<Boolean>() {
       @Override
       public Boolean get() {
-        LOG.info("expectedResultsCount={} actualResultsCount={}",
-            expectedBlkMovResultsCount,
-            sps.getAttemptedItemsMonitor().resultsCount());
-        return sps.getAttemptedItemsMonitor()
-            .resultsCount() == expectedBlkMovResultsCount;
+        LOG.info("MovementFinishedBlocks: expectedCount={} actualCount={}",
+            expectedMovementFinishedBlocksCount,
+            sps.getAttemptedItemsMonitor().getMovementFinishedBlocksCount());
+        return sps.getAttemptedItemsMonitor().getMovementFinishedBlocksCount()
+            >= expectedMovementFinishedBlocksCount;
       }
     }, 100, timeout);
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org