You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ra...@apache.org on 2018/01/29 04:12:48 UTC

[43/50] [abbrv] hadoop git commit: HDFS-12911. [SPS]: Modularize the SPS code and expose necessary interfaces for external/internal implementations. Contributed by Uma Maheswara Rao G

http://git-wip-us.apache.org/repos/asf/hadoop/blob/50b9c4c9/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java
index 2a7bde5..9354044 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java
@@ -72,7 +72,6 @@ import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
-import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.event.Level;
@@ -147,12 +146,11 @@ public class TestStoragePolicySatisfier {
     startAdditionalDNs(config, 3, numOfDatanodes, newtypes,
         storagesPerDatanode, capacity, hdfsCluster);
 
-    dfs.satisfyStoragePolicy(new Path(file));
-
     hdfsCluster.triggerHeartbeats();
+    dfs.satisfyStoragePolicy(new Path(file));
     // Wait till namenode notified about the block location details
-    DFSTestUtil.waitExpectedStorageType(
-        file, StorageType.ARCHIVE, 3, 30000, dfs);
+    DFSTestUtil.waitExpectedStorageType(file, StorageType.ARCHIVE, 3, 35000,
+        dfs);
   }
 
   @Test(timeout = 300000)
@@ -1284,6 +1282,7 @@ public class TestStoragePolicySatisfier {
         {StorageType.ARCHIVE, StorageType.SSD},
         {StorageType.DISK, StorageType.DISK}};
     config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
+    config.setInt(DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY, 10);
     hdfsCluster = startCluster(config, diskTypes, diskTypes.length,
         storagesPerDatanode, capacity);
     dfs = hdfsCluster.getFileSystem();
@@ -1299,19 +1298,28 @@ public class TestStoragePolicySatisfier {
 
     //Queue limit can control the traverse logic to wait for some free
     //entry in queue. After 10 files, traverse control will be on U.
-    StoragePolicySatisfier sps = Mockito.mock(StoragePolicySatisfier.class);
-    Mockito.when(sps.isRunning()).thenReturn(true);
-    Context ctxt = Mockito.mock(Context.class);
-    config.setInt(DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY, 10);
-    Mockito.when(ctxt.getConf()).thenReturn(config);
-    Mockito.when(ctxt.isRunning()).thenReturn(true);
-    Mockito.when(ctxt.isInSafeMode()).thenReturn(false);
-    Mockito.when(ctxt.isFileExist(Mockito.anyLong())).thenReturn(true);
-    BlockStorageMovementNeeded movmentNeededQueue =
-        new BlockStorageMovementNeeded(ctxt);
+    StoragePolicySatisfier sps = new StoragePolicySatisfier(config);
+    Context ctxt = new IntraSPSNameNodeContext(hdfsCluster.getNamesystem(),
+        hdfsCluster.getNamesystem().getBlockManager(), sps) {
+      @Override
+      public boolean isInSafeMode() {
+        return false;
+      }
+
+      @Override
+      public boolean isRunning() {
+        return true;
+      }
+    };
+
+    FileIdCollector fileIDCollector =
+        new IntraSPSNameNodeFileIdCollector(fsDir, sps);
+    sps.init(ctxt, fileIDCollector, null);
+    sps.getStorageMovementQueue().activate();
+
     INode rootINode = fsDir.getINode("/root");
-    movmentNeededQueue.addToPendingDirQueue(rootINode.getId());
-    movmentNeededQueue.init(fsDir);
+    hdfsCluster.getNamesystem().getBlockManager()
+        .addSPSPathId(rootINode.getId());
 
     //Wait for thread to reach U.
     Thread.sleep(1000);
@@ -1321,7 +1329,7 @@ public class TestStoragePolicySatisfier {
     // Remove 10 element and make queue free, So other traversing will start.
     for (int i = 0; i < 10; i++) {
       String path = expectedTraverseOrder.remove(0);
-      long trackId = movmentNeededQueue.get().getTrackId();
+      long trackId = sps.getStorageMovementQueue().get().getFileId();
       INode inode = fsDir.getInode(trackId);
       assertTrue("Failed to traverse tree, expected " + path + " but got "
           + inode.getFullPathName(), path.equals(inode.getFullPathName()));
@@ -1332,7 +1340,7 @@ public class TestStoragePolicySatisfier {
     // Check other element traversed in order and R,S should not be added in
     // queue which we already removed from expected list
     for (String path : expectedTraverseOrder) {
-      long trackId = movmentNeededQueue.get().getTrackId();
+      long trackId = sps.getStorageMovementQueue().get().getFileId();
       INode inode = fsDir.getInode(trackId);
       assertTrue("Failed to traverse tree, expected " + path + " but got "
           + inode.getFullPathName(), path.equals(inode.getFullPathName()));
@@ -1352,6 +1360,7 @@ public class TestStoragePolicySatisfier {
         {StorageType.ARCHIVE, StorageType.SSD},
         {StorageType.DISK, StorageType.DISK}};
     config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
+    config.setInt(DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY, 10);
     hdfsCluster = startCluster(config, diskTypes, diskTypes.length,
         storagesPerDatanode, capacity);
     dfs = hdfsCluster.getFileSystem();
@@ -1366,21 +1375,33 @@ public class TestStoragePolicySatisfier {
     expectedTraverseOrder.remove("/root/D/M");
     expectedTraverseOrder.remove("/root/E");
     FSDirectory fsDir = hdfsCluster.getNamesystem().getFSDirectory();
-    StoragePolicySatisfier sps = Mockito.mock(StoragePolicySatisfier.class);
-    Mockito.when(sps.isRunning()).thenReturn(true);
+
     // Queue limit can control the traverse logic to wait for some free
     // entry in queue. After 10 files, traverse control will be on U.
-    Context ctxt = Mockito.mock(Context.class);
-    config.setInt(DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY, 10);
-    Mockito.when(ctxt.getConf()).thenReturn(config);
-    Mockito.when(ctxt.isRunning()).thenReturn(true);
-    Mockito.when(ctxt.isInSafeMode()).thenReturn(false);
-    Mockito.when(ctxt.isFileExist(Mockito.anyLong())).thenReturn(true);
-    BlockStorageMovementNeeded movmentNeededQueue =
-        new BlockStorageMovementNeeded(ctxt);
-    movmentNeededQueue.init(fsDir);
+    // StoragePolicySatisfier sps = new StoragePolicySatisfier(config);
+    StoragePolicySatisfier sps = new StoragePolicySatisfier(config);
+    Context ctxt = new IntraSPSNameNodeContext(hdfsCluster.getNamesystem(),
+        hdfsCluster.getNamesystem().getBlockManager(), sps) {
+      @Override
+      public boolean isInSafeMode() {
+        return false;
+      }
+
+      @Override
+      public boolean isRunning() {
+        return true;
+      }
+    };
+
+    FileIdCollector fileIDCollector =
+        new IntraSPSNameNodeFileIdCollector(fsDir, sps);
+    sps.init(ctxt, fileIDCollector, null);
+    sps.getStorageMovementQueue().activate();
+
     INode rootINode = fsDir.getINode("/root");
-    movmentNeededQueue.addToPendingDirQueue(rootINode.getId());
+    hdfsCluster.getNamesystem().getBlockManager()
+        .addSPSPathId(rootINode.getId());
+
     // Wait for thread to reach U.
     Thread.sleep(1000);
 
@@ -1389,7 +1410,7 @@ public class TestStoragePolicySatisfier {
     // Remove 10 element and make queue free, So other traversing will start.
     for (int i = 0; i < 10; i++) {
       String path = expectedTraverseOrder.remove(0);
-      long trackId = movmentNeededQueue.get().getTrackId();
+      long trackId = sps.getStorageMovementQueue().get().getFileId();
       INode inode = fsDir.getInode(trackId);
       assertTrue("Failed to traverse tree, expected " + path + " but got "
           + inode.getFullPathName(), path.equals(inode.getFullPathName()));
@@ -1400,7 +1421,7 @@ public class TestStoragePolicySatisfier {
     // Check other element traversed in order and E, M, U, R, S should not be
     // added in queue which we already removed from expected list
     for (String path : expectedTraverseOrder) {
-      long trackId = movmentNeededQueue.get().getTrackId();
+      long trackId = sps.getStorageMovementQueue().get().getFileId();
       INode inode = fsDir.getInode(trackId);
       assertTrue("Failed to traverse tree, expected " + path + " but got "
           + inode.getFullPathName(), path.equals(inode.getFullPathName()));
@@ -1502,17 +1523,20 @@ public class TestStoragePolicySatisfier {
       hdfsCluster = new MiniDFSCluster.Builder(config).numDataNodes(2)
           .storageTypes(storagetypes).build();
       hdfsCluster.waitActive();
-      BlockStorageMovementNeeded.setStatusClearanceElapsedTimeMs(20000);
+      // BlockStorageMovementNeeded.setStatusClearanceElapsedTimeMs(200000);
       dfs = hdfsCluster.getFileSystem();
       Path filePath = new Path("/file");
       DFSTestUtil.createFile(dfs, filePath, 1024, (short) 2,
             0);
       dfs.setStoragePolicy(filePath, "COLD");
       dfs.satisfyStoragePolicy(filePath);
+      Thread.sleep(3000);
       StoragePolicySatisfyPathStatus status = dfs.getClient()
           .checkStoragePolicySatisfyPathStatus(filePath.toString());
-      Assert.assertTrue("Status should be IN_PROGRESS",
-          StoragePolicySatisfyPathStatus.IN_PROGRESS.equals(status));
+      Assert.assertTrue(
+          "Status should be IN_PROGRESS/SUCCESS, but status is " + status,
+          StoragePolicySatisfyPathStatus.IN_PROGRESS.equals(status)
+              || StoragePolicySatisfyPathStatus.SUCCESS.equals(status));
       DFSTestUtil.waitExpectedStorageType(filePath.toString(),
           StorageType.ARCHIVE, 2, 30000, dfs);
 
@@ -1530,7 +1554,7 @@ public class TestStoragePolicySatisfier {
           return false;
         }
       }, 100, 60000);
-
+      BlockStorageMovementNeeded.setStatusClearanceElapsedTimeMs(1000);
       // wait till status is NOT_AVAILABLE
       GenericTestUtils.waitFor(new Supplier<Boolean>() {
         @Override
@@ -1719,8 +1743,10 @@ public class TestStoragePolicySatisfier {
       public Boolean get() {
         LOG.info("expectedAttemptedItemsCount={} actualAttemptedItemsCount={}",
             expectedBlkMovAttemptedCount,
-            sps.getAttemptedItemsMonitor().getAttemptedItemsCount());
-        return sps.getAttemptedItemsMonitor()
+            ((BlockStorageMovementAttemptedItems) (sps
+                .getAttemptedItemsMonitor())).getAttemptedItemsCount());
+        return ((BlockStorageMovementAttemptedItems) (sps
+            .getAttemptedItemsMonitor()))
             .getAttemptedItemsCount() == expectedBlkMovAttemptedCount;
       }
     }, 100, timeout);
@@ -1736,8 +1762,11 @@ public class TestStoragePolicySatisfier {
       public Boolean get() {
         LOG.info("MovementFinishedBlocks: expectedCount={} actualCount={}",
             expectedMovementFinishedBlocksCount,
-            sps.getAttemptedItemsMonitor().getMovementFinishedBlocksCount());
-        return sps.getAttemptedItemsMonitor().getMovementFinishedBlocksCount()
+            ((BlockStorageMovementAttemptedItems) (sps
+                .getAttemptedItemsMonitor())).getMovementFinishedBlocksCount());
+        return ((BlockStorageMovementAttemptedItems) (sps
+            .getAttemptedItemsMonitor()))
+                .getMovementFinishedBlocksCount()
             >= expectedMovementFinishedBlocksCount;
       }
     }, 100, timeout);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/50b9c4c9/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfierWithStripedFile.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfierWithStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfierWithStripedFile.java
index c1a2b8b..0e3a5a3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfierWithStripedFile.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfierWithStripedFile.java
@@ -500,9 +500,11 @@ public class TestStoragePolicySatisfierWithStripedFile {
       public Boolean get() {
         LOG.info("expectedAttemptedItemsCount={} actualAttemptedItemsCount={}",
             expectedBlkMovAttemptedCount,
-            sps.getAttemptedItemsMonitor().getAttemptedItemsCount());
-        return sps.getAttemptedItemsMonitor()
-            .getAttemptedItemsCount() == expectedBlkMovAttemptedCount;
+            ((BlockStorageMovementAttemptedItems) sps
+                .getAttemptedItemsMonitor()).getAttemptedItemsCount());
+        return ((BlockStorageMovementAttemptedItems) sps
+            .getAttemptedItemsMonitor())
+                .getAttemptedItemsCount() == expectedBlkMovAttemptedCount;
       }
     }, 100, timeout);
   }
@@ -560,7 +562,7 @@ public class TestStoragePolicySatisfierWithStripedFile {
   // Check whether the block movement attempt report has been arrived at the
   // Namenode(SPS).
   private void waitForBlocksMovementAttemptReport(MiniDFSCluster cluster,
-      long expectedMovementFinishedBlocksCount, int timeout)
+      long expectedMoveFinishedBlks, int timeout)
           throws TimeoutException, InterruptedException {
     BlockManager blockManager = cluster.getNamesystem().getBlockManager();
     final StoragePolicySatisfier sps = blockManager.getStoragePolicySatisfier();
@@ -570,10 +572,11 @@ public class TestStoragePolicySatisfierWithStripedFile {
       @Override
       public Boolean get() {
         LOG.info("MovementFinishedBlocks: expectedCount={} actualCount={}",
-            expectedMovementFinishedBlocksCount,
-            sps.getAttemptedItemsMonitor().getMovementFinishedBlocksCount());
-        return sps.getAttemptedItemsMonitor().getMovementFinishedBlocksCount()
-            >= expectedMovementFinishedBlocksCount;
+            expectedMoveFinishedBlks, ((BlockStorageMovementAttemptedItems) sps
+                .getAttemptedItemsMonitor()).getMovementFinishedBlocksCount());
+        return ((BlockStorageMovementAttemptedItems) sps
+            .getAttemptedItemsMonitor())
+                .getMovementFinishedBlocksCount() >= expectedMoveFinishedBlks;
       }
     }, 100, timeout);
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org