You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ra...@apache.org on 2018/01/29 04:12:34 UTC
[29/50] [abbrv] hadoop git commit: HDFS-12570: [SPS]: Refactor
Co-ordinator datanode logic to track the block storage movements. Contributed
by Rakesh R.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/776af85f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
index 57e9f94..70219f6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
@@ -203,11 +203,11 @@ public class TestStoragePolicySatisfier {
}
/**
- * Tests to verify that the block storage movement results will be propagated
+ * Tests to verify that the block storage movement report will be propagated
* to Namenode via datanode heartbeat.
*/
@Test(timeout = 300000)
- public void testPerTrackIdBlocksStorageMovementResults() throws Exception {
+ public void testBlksStorageMovementAttemptFinishedReport() throws Exception {
try {
createCluster();
// Change policy to ONE_SSD
@@ -229,7 +229,7 @@ public class TestStoragePolicySatisfier {
DFSTestUtil.waitExpectedStorageType(
file, StorageType.DISK, 2, 30000, dfs);
- waitForBlocksMovementResult(1, 30000);
+ waitForBlocksMovementAttemptReport(1, 30000);
} finally {
shutdownCluster();
}
@@ -276,7 +276,7 @@ public class TestStoragePolicySatisfier {
fileName, StorageType.DISK, 2, 30000, dfs);
}
- waitForBlocksMovementResult(files.size(), 30000);
+ waitForBlocksMovementAttemptReport(files.size(), 30000);
} finally {
shutdownCluster();
}
@@ -457,7 +457,7 @@ public class TestStoragePolicySatisfier {
DFSTestUtil.waitExpectedStorageType(
file, StorageType.DISK, 2, 30000, dfs);
- waitForBlocksMovementResult(1, 30000);
+ waitForBlocksMovementAttemptReport(1, 30000);
} finally {
shutdownCluster();
}
@@ -630,7 +630,7 @@ public class TestStoragePolicySatisfier {
// No block movement will be scheduled as there is no target node
// available with the required storage type.
waitForAttemptedItems(1, 30000);
- waitForBlocksMovementResult(1, 30000);
+ waitForBlocksMovementAttemptReport(1, 30000);
DFSTestUtil.waitExpectedStorageType(
file1, StorageType.ARCHIVE, 1, 30000, dfs);
DFSTestUtil.waitExpectedStorageType(
@@ -691,7 +691,7 @@ public class TestStoragePolicySatisfier {
DFSTestUtil.waitExpectedStorageType(
file, StorageType.DISK, 3, 30000, dfs);
- waitForBlocksMovementResult(1, 30000);
+ waitForBlocksMovementAttemptReport(1, 30000);
} finally {
shutdownCluster();
}
@@ -871,7 +871,7 @@ public class TestStoragePolicySatisfier {
Set<DatanodeDescriptor> dns = hdfsCluster.getNamesystem()
.getBlockManager().getDatanodeManager().getDatanodes();
for (DatanodeDescriptor dd : dns) {
- assertNull(dd.getBlocksToMoveStorages());
+ assertNull(dd.getBlocksToMoveStorages(1));
}
// Enable heart beats now
@@ -1224,7 +1224,7 @@ public class TestStoragePolicySatisfier {
/**
* Test SPS for batch processing.
*/
- @Test(timeout = 300000)
+ @Test(timeout = 3000000)
public void testBatchProcessingForSPSDirectory() throws Exception {
try {
StorageType[][] diskTypes = new StorageType[][] {
@@ -1252,7 +1252,7 @@ public class TestStoragePolicySatisfier {
DFSTestUtil.waitExpectedStorageType(fileName, StorageType.ARCHIVE, 2,
30000, dfs);
}
- waitForBlocksMovementResult(files.size(), 30000);
+ waitForBlocksMovementAttemptReport(files.size(), 30000);
String expectedLogMessage = "StorageMovementNeeded queue remaining"
+ " capacity is zero";
assertTrue("Log output does not contain expected log message: "
@@ -1268,7 +1268,7 @@ public class TestStoragePolicySatisfier {
* 1. Delete /root when traversing Q
* 2. U, R, S should not be in queued.
*/
- @Test
+ @Test(timeout = 300000)
public void testTraverseWhenParentDeleted() throws Exception {
StorageType[][] diskTypes = new StorageType[][] {
{StorageType.DISK, StorageType.ARCHIVE},
@@ -1330,7 +1330,7 @@ public class TestStoragePolicySatisfier {
* 1. Delete L when traversing Q
* 2. E, M, U, R, S should not be in queued.
*/
- @Test
+ @Test(timeout = 300000)
public void testTraverseWhenRootParentDeleted() throws Exception {
StorageType[][] diskTypes = new StorageType[][] {
{StorageType.DISK, StorageType.ARCHIVE},
@@ -1387,6 +1387,82 @@ public class TestStoragePolicySatisfier {
dfs.delete(new Path("/root"), true);
}
+ /**
+ * Test storage move blocks while under replication block tasks exists in the
+ * system. So, both will share the max transfer streams.
+ *
+ * 1. Create cluster with 3 datanode.
+ * 2. Create 20 files with 2 replica.
+ * 3. Start 2 more DNs with DISK & SSD types
+ * 4. SetReplication factor for the 1st 10 files to 4 to trigger replica task
+ * 5. Set policy to SSD to the 2nd set of files from 11-20
+ * 6. Call SPS for 11-20 files to trigger move block tasks to new DNs
+ * 7. Wait for the under replica and SPS tasks completion
+ */
+ @Test(timeout = 300000)
+ public void testMoveBlocksWithUnderReplicatedBlocks() throws Exception {
+ try {
+ config.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 3);
+ config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
+ config.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
+ true);
+ config.set(DFSConfigKeys
+ .DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY,
+ "3000");
+ config.setBoolean(DFSConfigKeys
+ .DFS_STORAGE_POLICY_SATISFIER_SHARE_EQUAL_REPLICA_MAX_STREAMS_KEY,
+ true);
+
+ StorageType[][] storagetypes = new StorageType[][] {
+ {StorageType.ARCHIVE, StorageType.DISK},
+ {StorageType.ARCHIVE, StorageType.DISK}};
+ hdfsCluster = new MiniDFSCluster.Builder(config).numDataNodes(2)
+ .storageTypes(storagetypes).build();
+ hdfsCluster.waitActive();
+ dfs = hdfsCluster.getFileSystem();
+
+ // Below files will be used for pending replication block tasks.
+ for (int i=1; i<=20; i++){
+ Path filePath = new Path("/file" + i);
+ DFSTestUtil.createFile(dfs, filePath, DEFAULT_BLOCK_SIZE * 5, (short) 2,
+ 0);
+ }
+
+ StorageType[][] newtypes =
+ new StorageType[][]{{StorageType.DISK, StorageType.SSD},
+ {StorageType.DISK, StorageType.SSD}};
+ startAdditionalDNs(config, 2, numOfDatanodes, newtypes,
+ storagesPerDatanode, capacity, hdfsCluster);
+
+ // increase replication factor to 4 for the first 10 files and thus
+ // initiate replica tasks
+ for (int i=1; i<=10; i++){
+ Path filePath = new Path("/file" + i);
+ dfs.setReplication(filePath, (short) 4);
+ }
+
+ // invoke SPS for 11-20 files
+ for (int i = 11; i <= 20; i++) {
+ Path filePath = new Path("/file" + i);
+ dfs.setStoragePolicy(filePath, "ALL_SSD");
+ dfs.satisfyStoragePolicy(filePath);
+ }
+
+ for (int i = 1; i <= 10; i++) {
+ Path filePath = new Path("/file" + i);
+ DFSTestUtil.waitExpectedStorageType(filePath.toString(),
+ StorageType.DISK, 4, 30000, hdfsCluster.getFileSystem());
+ }
+ for (int i = 11; i <= 20; i++) {
+ Path filePath = new Path("/file" + i);
+ DFSTestUtil.waitExpectedStorageType(filePath.toString(),
+ StorageType.SSD, 2, 30000, hdfsCluster.getFileSystem());
+ }
+ } finally {
+ shutdownCluster();
+ }
+ }
+
private static void createDirectoryTree(DistributedFileSystem dfs)
throws Exception {
// tree structure
@@ -1514,18 +1590,19 @@ public class TestStoragePolicySatisfier {
}, 100, timeout);
}
- private void waitForBlocksMovementResult(long expectedBlkMovResultsCount,
- int timeout) throws TimeoutException, InterruptedException {
+ private void waitForBlocksMovementAttemptReport(
+ long expectedMovementFinishedBlocksCount, int timeout)
+ throws TimeoutException, InterruptedException {
BlockManager blockManager = hdfsCluster.getNamesystem().getBlockManager();
final StoragePolicySatisfier sps = blockManager.getStoragePolicySatisfier();
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
- LOG.info("expectedResultsCount={} actualResultsCount={}",
- expectedBlkMovResultsCount,
- sps.getAttemptedItemsMonitor().resultsCount());
- return sps.getAttemptedItemsMonitor()
- .resultsCount() == expectedBlkMovResultsCount;
+ LOG.info("MovementFinishedBlocks: expectedCount={} actualCount={}",
+ expectedMovementFinishedBlocksCount,
+ sps.getAttemptedItemsMonitor().getMovementFinishedBlocksCount());
+ return sps.getAttemptedItemsMonitor().getMovementFinishedBlocksCount()
+ >= expectedMovementFinishedBlocksCount;
}
}, 100, timeout);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/776af85f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfierWithStripedFile.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfierWithStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfierWithStripedFile.java
index fc5d0a5..154ddae 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfierWithStripedFile.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfierWithStripedFile.java
@@ -180,7 +180,7 @@ public class TestStoragePolicySatisfierWithStripedFile {
LOG.info("Sets storage policy to COLD and invoked satisfyStoragePolicy");
cluster.triggerHeartbeats();
- waitForBlocksMovementResult(cluster, 1, 60000);
+ waitForBlocksMovementAttemptReport(cluster, 9, 60000);
// verify storage types and locations
waitExpectedStorageType(cluster, fooFile, fileLen, StorageType.ARCHIVE, 9,
9, 60000);
@@ -290,7 +290,7 @@ public class TestStoragePolicySatisfierWithStripedFile {
LOG.info("Sets storage policy to COLD and invoked satisfyStoragePolicy");
cluster.triggerHeartbeats();
- waitForBlocksMovementResult(cluster, 1, 60000);
+ waitForBlocksMovementAttemptReport(cluster, 5, 60000);
waitForAttemptedItems(cluster, 1, 30000);
// verify storage types and locations.
waitExpectedStorageType(cluster, fooFile, fileLen, StorageType.ARCHIVE, 5,
@@ -556,10 +556,10 @@ public class TestStoragePolicySatisfierWithStripedFile {
}, 100, timeout);
}
- // Check whether the block movement result has been arrived at the
+ // Check whether the block movement attempt report has been arrived at the
// Namenode(SPS).
- private void waitForBlocksMovementResult(MiniDFSCluster cluster,
- long expectedBlkMovResultsCount, int timeout)
+ private void waitForBlocksMovementAttemptReport(MiniDFSCluster cluster,
+ long expectedMovementFinishedBlocksCount, int timeout)
throws TimeoutException, InterruptedException {
BlockManager blockManager = cluster.getNamesystem().getBlockManager();
final StoragePolicySatisfier sps = blockManager.getStoragePolicySatisfier();
@@ -568,11 +568,11 @@ public class TestStoragePolicySatisfierWithStripedFile {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
- LOG.info("expectedResultsCount={} actualResultsCount={}",
- expectedBlkMovResultsCount,
- sps.getAttemptedItemsMonitor().resultsCount());
- return sps.getAttemptedItemsMonitor()
- .resultsCount() == expectedBlkMovResultsCount;
+ LOG.info("MovementFinishedBlocks: expectedCount={} actualCount={}",
+ expectedMovementFinishedBlocksCount,
+ sps.getAttemptedItemsMonitor().getMovementFinishedBlocksCount());
+ return sps.getAttemptedItemsMonitor().getMovementFinishedBlocksCount()
+ >= expectedMovementFinishedBlocksCount;
}
}, 100, timeout);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org