You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by um...@apache.org on 2018/08/10 03:49:13 UTC
[20/50] [abbrv] hadoop git commit: HDFS-12911. [SPS]: Modularize the
SPS code and expose necessary interfaces for external/internal
implementations. Contributed by Uma Maheswara Rao G
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8400f79/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java
index 2a7bde5..9354044 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java
@@ -72,7 +72,6 @@ import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
-import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;
@@ -147,12 +146,11 @@ public class TestStoragePolicySatisfier {
startAdditionalDNs(config, 3, numOfDatanodes, newtypes,
storagesPerDatanode, capacity, hdfsCluster);
- dfs.satisfyStoragePolicy(new Path(file));
-
hdfsCluster.triggerHeartbeats();
+ dfs.satisfyStoragePolicy(new Path(file));
// Wait till namenode notified about the block location details
- DFSTestUtil.waitExpectedStorageType(
- file, StorageType.ARCHIVE, 3, 30000, dfs);
+ DFSTestUtil.waitExpectedStorageType(file, StorageType.ARCHIVE, 3, 35000,
+ dfs);
}
@Test(timeout = 300000)
@@ -1284,6 +1282,7 @@ public class TestStoragePolicySatisfier {
{StorageType.ARCHIVE, StorageType.SSD},
{StorageType.DISK, StorageType.DISK}};
config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
+ config.setInt(DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY, 10);
hdfsCluster = startCluster(config, diskTypes, diskTypes.length,
storagesPerDatanode, capacity);
dfs = hdfsCluster.getFileSystem();
@@ -1299,19 +1298,28 @@ public class TestStoragePolicySatisfier {
//Queue limit can control the traverse logic to wait for some free
//entry in queue. After 10 files, traverse control will be on U.
- StoragePolicySatisfier sps = Mockito.mock(StoragePolicySatisfier.class);
- Mockito.when(sps.isRunning()).thenReturn(true);
- Context ctxt = Mockito.mock(Context.class);
- config.setInt(DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY, 10);
- Mockito.when(ctxt.getConf()).thenReturn(config);
- Mockito.when(ctxt.isRunning()).thenReturn(true);
- Mockito.when(ctxt.isInSafeMode()).thenReturn(false);
- Mockito.when(ctxt.isFileExist(Mockito.anyLong())).thenReturn(true);
- BlockStorageMovementNeeded movmentNeededQueue =
- new BlockStorageMovementNeeded(ctxt);
+ StoragePolicySatisfier sps = new StoragePolicySatisfier(config);
+ Context ctxt = new IntraSPSNameNodeContext(hdfsCluster.getNamesystem(),
+ hdfsCluster.getNamesystem().getBlockManager(), sps) {
+ @Override
+ public boolean isInSafeMode() {
+ return false;
+ }
+
+ @Override
+ public boolean isRunning() {
+ return true;
+ }
+ };
+
+ FileIdCollector fileIDCollector =
+ new IntraSPSNameNodeFileIdCollector(fsDir, sps);
+ sps.init(ctxt, fileIDCollector, null);
+ sps.getStorageMovementQueue().activate();
+
INode rootINode = fsDir.getINode("/root");
- movmentNeededQueue.addToPendingDirQueue(rootINode.getId());
- movmentNeededQueue.init(fsDir);
+ hdfsCluster.getNamesystem().getBlockManager()
+ .addSPSPathId(rootINode.getId());
//Wait for thread to reach U.
Thread.sleep(1000);
@@ -1321,7 +1329,7 @@ public class TestStoragePolicySatisfier {
// Remove 10 element and make queue free, So other traversing will start.
for (int i = 0; i < 10; i++) {
String path = expectedTraverseOrder.remove(0);
- long trackId = movmentNeededQueue.get().getTrackId();
+ long trackId = sps.getStorageMovementQueue().get().getFileId();
INode inode = fsDir.getInode(trackId);
assertTrue("Failed to traverse tree, expected " + path + " but got "
+ inode.getFullPathName(), path.equals(inode.getFullPathName()));
@@ -1332,7 +1340,7 @@ public class TestStoragePolicySatisfier {
// Check other element traversed in order and R,S should not be added in
// queue which we already removed from expected list
for (String path : expectedTraverseOrder) {
- long trackId = movmentNeededQueue.get().getTrackId();
+ long trackId = sps.getStorageMovementQueue().get().getFileId();
INode inode = fsDir.getInode(trackId);
assertTrue("Failed to traverse tree, expected " + path + " but got "
+ inode.getFullPathName(), path.equals(inode.getFullPathName()));
@@ -1352,6 +1360,7 @@ public class TestStoragePolicySatisfier {
{StorageType.ARCHIVE, StorageType.SSD},
{StorageType.DISK, StorageType.DISK}};
config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
+ config.setInt(DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY, 10);
hdfsCluster = startCluster(config, diskTypes, diskTypes.length,
storagesPerDatanode, capacity);
dfs = hdfsCluster.getFileSystem();
@@ -1366,21 +1375,33 @@ public class TestStoragePolicySatisfier {
expectedTraverseOrder.remove("/root/D/M");
expectedTraverseOrder.remove("/root/E");
FSDirectory fsDir = hdfsCluster.getNamesystem().getFSDirectory();
- StoragePolicySatisfier sps = Mockito.mock(StoragePolicySatisfier.class);
- Mockito.when(sps.isRunning()).thenReturn(true);
+
// Queue limit can control the traverse logic to wait for some free
// entry in queue. After 10 files, traverse control will be on U.
- Context ctxt = Mockito.mock(Context.class);
- config.setInt(DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY, 10);
- Mockito.when(ctxt.getConf()).thenReturn(config);
- Mockito.when(ctxt.isRunning()).thenReturn(true);
- Mockito.when(ctxt.isInSafeMode()).thenReturn(false);
- Mockito.when(ctxt.isFileExist(Mockito.anyLong())).thenReturn(true);
- BlockStorageMovementNeeded movmentNeededQueue =
- new BlockStorageMovementNeeded(ctxt);
- movmentNeededQueue.init(fsDir);
+ // StoragePolicySatisfier sps = new StoragePolicySatisfier(config);
+ StoragePolicySatisfier sps = new StoragePolicySatisfier(config);
+ Context ctxt = new IntraSPSNameNodeContext(hdfsCluster.getNamesystem(),
+ hdfsCluster.getNamesystem().getBlockManager(), sps) {
+ @Override
+ public boolean isInSafeMode() {
+ return false;
+ }
+
+ @Override
+ public boolean isRunning() {
+ return true;
+ }
+ };
+
+ FileIdCollector fileIDCollector =
+ new IntraSPSNameNodeFileIdCollector(fsDir, sps);
+ sps.init(ctxt, fileIDCollector, null);
+ sps.getStorageMovementQueue().activate();
+
INode rootINode = fsDir.getINode("/root");
- movmentNeededQueue.addToPendingDirQueue(rootINode.getId());
+ hdfsCluster.getNamesystem().getBlockManager()
+ .addSPSPathId(rootINode.getId());
+
// Wait for thread to reach U.
Thread.sleep(1000);
@@ -1389,7 +1410,7 @@ public class TestStoragePolicySatisfier {
// Remove 10 element and make queue free, So other traversing will start.
for (int i = 0; i < 10; i++) {
String path = expectedTraverseOrder.remove(0);
- long trackId = movmentNeededQueue.get().getTrackId();
+ long trackId = sps.getStorageMovementQueue().get().getFileId();
INode inode = fsDir.getInode(trackId);
assertTrue("Failed to traverse tree, expected " + path + " but got "
+ inode.getFullPathName(), path.equals(inode.getFullPathName()));
@@ -1400,7 +1421,7 @@ public class TestStoragePolicySatisfier {
// Check other element traversed in order and E, M, U, R, S should not be
// added in queue which we already removed from expected list
for (String path : expectedTraverseOrder) {
- long trackId = movmentNeededQueue.get().getTrackId();
+ long trackId = sps.getStorageMovementQueue().get().getFileId();
INode inode = fsDir.getInode(trackId);
assertTrue("Failed to traverse tree, expected " + path + " but got "
+ inode.getFullPathName(), path.equals(inode.getFullPathName()));
@@ -1502,17 +1523,20 @@ public class TestStoragePolicySatisfier {
hdfsCluster = new MiniDFSCluster.Builder(config).numDataNodes(2)
.storageTypes(storagetypes).build();
hdfsCluster.waitActive();
- BlockStorageMovementNeeded.setStatusClearanceElapsedTimeMs(20000);
+ // BlockStorageMovementNeeded.setStatusClearanceElapsedTimeMs(200000);
dfs = hdfsCluster.getFileSystem();
Path filePath = new Path("/file");
DFSTestUtil.createFile(dfs, filePath, 1024, (short) 2,
0);
dfs.setStoragePolicy(filePath, "COLD");
dfs.satisfyStoragePolicy(filePath);
+ Thread.sleep(3000);
StoragePolicySatisfyPathStatus status = dfs.getClient()
.checkStoragePolicySatisfyPathStatus(filePath.toString());
- Assert.assertTrue("Status should be IN_PROGRESS",
- StoragePolicySatisfyPathStatus.IN_PROGRESS.equals(status));
+ Assert.assertTrue(
+ "Status should be IN_PROGRESS/SUCCESS, but status is " + status,
+ StoragePolicySatisfyPathStatus.IN_PROGRESS.equals(status)
+ || StoragePolicySatisfyPathStatus.SUCCESS.equals(status));
DFSTestUtil.waitExpectedStorageType(filePath.toString(),
StorageType.ARCHIVE, 2, 30000, dfs);
@@ -1530,7 +1554,7 @@ public class TestStoragePolicySatisfier {
return false;
}
}, 100, 60000);
-
+ BlockStorageMovementNeeded.setStatusClearanceElapsedTimeMs(1000);
// wait till status is NOT_AVAILABLE
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
@@ -1719,8 +1743,10 @@ public class TestStoragePolicySatisfier {
public Boolean get() {
LOG.info("expectedAttemptedItemsCount={} actualAttemptedItemsCount={}",
expectedBlkMovAttemptedCount,
- sps.getAttemptedItemsMonitor().getAttemptedItemsCount());
- return sps.getAttemptedItemsMonitor()
+ ((BlockStorageMovementAttemptedItems) (sps
+ .getAttemptedItemsMonitor())).getAttemptedItemsCount());
+ return ((BlockStorageMovementAttemptedItems) (sps
+ .getAttemptedItemsMonitor()))
.getAttemptedItemsCount() == expectedBlkMovAttemptedCount;
}
}, 100, timeout);
@@ -1736,8 +1762,11 @@ public class TestStoragePolicySatisfier {
public Boolean get() {
LOG.info("MovementFinishedBlocks: expectedCount={} actualCount={}",
expectedMovementFinishedBlocksCount,
- sps.getAttemptedItemsMonitor().getMovementFinishedBlocksCount());
- return sps.getAttemptedItemsMonitor().getMovementFinishedBlocksCount()
+ ((BlockStorageMovementAttemptedItems) (sps
+ .getAttemptedItemsMonitor())).getMovementFinishedBlocksCount());
+ return ((BlockStorageMovementAttemptedItems) (sps
+ .getAttemptedItemsMonitor()))
+ .getMovementFinishedBlocksCount()
>= expectedMovementFinishedBlocksCount;
}
}, 100, timeout);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8400f79/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfierWithStripedFile.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfierWithStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfierWithStripedFile.java
index c1a2b8b..0e3a5a3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfierWithStripedFile.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfierWithStripedFile.java
@@ -500,9 +500,11 @@ public class TestStoragePolicySatisfierWithStripedFile {
public Boolean get() {
LOG.info("expectedAttemptedItemsCount={} actualAttemptedItemsCount={}",
expectedBlkMovAttemptedCount,
- sps.getAttemptedItemsMonitor().getAttemptedItemsCount());
- return sps.getAttemptedItemsMonitor()
- .getAttemptedItemsCount() == expectedBlkMovAttemptedCount;
+ ((BlockStorageMovementAttemptedItems) sps
+ .getAttemptedItemsMonitor()).getAttemptedItemsCount());
+ return ((BlockStorageMovementAttemptedItems) sps
+ .getAttemptedItemsMonitor())
+ .getAttemptedItemsCount() == expectedBlkMovAttemptedCount;
}
}, 100, timeout);
}
@@ -560,7 +562,7 @@ public class TestStoragePolicySatisfierWithStripedFile {
// Check whether the block movement attempt report has been arrived at the
// Namenode(SPS).
private void waitForBlocksMovementAttemptReport(MiniDFSCluster cluster,
- long expectedMovementFinishedBlocksCount, int timeout)
+ long expectedMoveFinishedBlks, int timeout)
throws TimeoutException, InterruptedException {
BlockManager blockManager = cluster.getNamesystem().getBlockManager();
final StoragePolicySatisfier sps = blockManager.getStoragePolicySatisfier();
@@ -570,10 +572,11 @@ public class TestStoragePolicySatisfierWithStripedFile {
@Override
public Boolean get() {
LOG.info("MovementFinishedBlocks: expectedCount={} actualCount={}",
- expectedMovementFinishedBlocksCount,
- sps.getAttemptedItemsMonitor().getMovementFinishedBlocksCount());
- return sps.getAttemptedItemsMonitor().getMovementFinishedBlocksCount()
- >= expectedMovementFinishedBlocksCount;
+ expectedMoveFinishedBlks, ((BlockStorageMovementAttemptedItems) sps
+ .getAttemptedItemsMonitor()).getMovementFinishedBlocksCount());
+ return ((BlockStorageMovementAttemptedItems) sps
+ .getAttemptedItemsMonitor())
+ .getMovementFinishedBlocksCount() >= expectedMoveFinishedBlks;
}
}, 100, timeout);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org