You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by in...@apache.org on 2017/06/21 23:23:49 UTC
[02/50] [abbrv] hadoop git commit: HDFS-10999. Introduce separate
stats for Replicated and Erasure Coded Blocks apart from the current
Aggregated stats. (Manoj Govindassamy via lei)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/999c8fcb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
index 038b6ce..5075c05 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
@@ -110,10 +110,13 @@ import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo.DatanodeInfoBuilder;
+import org.apache.hadoop.hdfs.protocol.ECBlockGroupsStats;
+import org.apache.hadoop.hdfs.protocol.BlocksStats;
import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -1651,6 +1654,50 @@ public class DFSTestUtil {
}
/**
+ * Verify the aggregated {@link ClientProtocol#getStats()} block counts equal
+ * the sum of {@link ClientProtocol#getBlocksStats()} and
+ * {@link ClientProtocol#getECBlockGroupsStats()}.
+ * @throws Exception
+ */
+ public static void verifyClientStats(Configuration conf,
+ MiniDFSCluster cluster) throws Exception {
+ ClientProtocol client = NameNodeProxies.createProxy(conf,
+ cluster.getFileSystem(0).getUri(),
+ ClientProtocol.class).getProxy();
+ long[] aggregatedStats = cluster.getNameNode().getRpcServer().getStats();
+ BlocksStats blocksStats =
+ client.getBlocksStats();
+ ECBlockGroupsStats ecBlockGroupsStats = client.getECBlockGroupsStats();
+
+ assertEquals("Under replicated stats not matching!",
+ aggregatedStats[ClientProtocol.GET_STATS_LOW_REDUNDANCY_IDX],
+ aggregatedStats[ClientProtocol.GET_STATS_UNDER_REPLICATED_IDX]);
+ assertEquals("Low redundancy stats not matching!",
+ aggregatedStats[ClientProtocol.GET_STATS_LOW_REDUNDANCY_IDX],
+ blocksStats.getLowRedundancyBlocksStat() +
+ ecBlockGroupsStats.getLowRedundancyBlockGroupsStat());
+ assertEquals("Corrupt blocks stats not matching!",
+ aggregatedStats[ClientProtocol.GET_STATS_CORRUPT_BLOCKS_IDX],
+ blocksStats.getCorruptBlocksStat() +
+ ecBlockGroupsStats.getCorruptBlockGroupsStat());
+ assertEquals("Missing blocks stats not matching!",
+ aggregatedStats[ClientProtocol.GET_STATS_MISSING_BLOCKS_IDX],
+ blocksStats.getMissingReplicaBlocksStat() +
+ ecBlockGroupsStats.getMissingBlockGroupsStat());
+ assertEquals("Missing blocks with replication factor one not matching!",
+ aggregatedStats[ClientProtocol.GET_STATS_MISSING_REPL_ONE_BLOCKS_IDX],
+ blocksStats.getMissingReplicationOneBlocksStat());
+ assertEquals("Bytes in future blocks stats not matching!",
+ aggregatedStats[ClientProtocol.GET_STATS_BYTES_IN_FUTURE_BLOCKS_IDX],
+ blocksStats.getBytesInFutureBlocksStat() +
+ ecBlockGroupsStats.getBytesInFutureBlockGroupsStat());
+ assertEquals("Pending deletion blocks stats not matching!",
+ aggregatedStats[ClientProtocol.GET_STATS_PENDING_DELETION_BLOCKS_IDX],
+ blocksStats.getPendingDeletionBlocksStat() +
+ ecBlockGroupsStats.getPendingDeletionBlockGroupsStat());
+ }
+
+ /**
* Helper function to create a key in the Key Provider. Defaults
* to the first indexed NameNode's Key Provider.
*
http://git-wip-us.apache.org/repos/asf/hadoop/blob/999c8fcb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCorruption.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCorruption.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCorruption.java
index 1d9d402..3e9d812 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCorruption.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCorruption.java
@@ -281,7 +281,7 @@ public class TestFileCorruption {
@Override public Boolean get() {
try {
return cluster.getNamesystem().getBlockManager()
- .getUnderReplicatedBlocksCount() == 1;
+ .getLowRedundancyBlocksCount() == 1;
} catch (Exception e) {
e.printStackTrace();
return false;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/999c8fcb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMaintenanceState.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMaintenanceState.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMaintenanceState.java
index e0dfb4a..7c536e9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMaintenanceState.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMaintenanceState.java
@@ -549,7 +549,7 @@ public class TestMaintenanceState extends AdminStatesBaseTest {
FileSystem fileSys = getCluster().getFileSystem(0);
FSNamesystem ns = getCluster().getNamesystem(0);
- writeFile(fileSys, file, replicas, 1);
+ writeFile(fileSys, file, replicas, 25);
DatanodeInfo nodeOutofService = takeNodeOutofService(0,
getFirstBlockFirstReplicaUuid(fileSys, file), Long.MAX_VALUE, null,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/999c8fcb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMissingBlocksAlert.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMissingBlocksAlert.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMissingBlocksAlert.java
index e225141..ca2fe92 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMissingBlocksAlert.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMissingBlocksAlert.java
@@ -98,7 +98,7 @@ public class TestMissingBlocksAlert {
Thread.sleep(100);
}
assertTrue(dfs.getMissingBlocksCount() == 1);
- assertEquals(4, dfs.getUnderReplicatedBlocksCount());
+ assertEquals(4, dfs.getLowRedundancyBlocksCount());
assertEquals(3, bm.getUnderReplicatedNotMissingBlocks());
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
@@ -117,7 +117,7 @@ public class TestMissingBlocksAlert {
Thread.sleep(100);
}
- assertEquals(2, dfs.getUnderReplicatedBlocksCount());
+ assertEquals(2, dfs.getLowRedundancyBlocksCount());
assertEquals(2, bm.getUnderReplicatedNotMissingBlocks());
Assert.assertEquals(0, (long)(Long) mbs.getAttribute(mxbeanName,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/999c8fcb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestComputeInvalidateWork.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestComputeInvalidateWork.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestComputeInvalidateWork.java
index 61e69f3..2413918 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestComputeInvalidateWork.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestComputeInvalidateWork.java
@@ -17,18 +17,24 @@
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
+import java.util.Random;
import java.util.UUID;
import static org.junit.Assert.assertEquals;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
+import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
@@ -53,10 +59,19 @@ public class TestComputeInvalidateWork {
private FSNamesystem namesystem;
private BlockManager bm;
private DatanodeDescriptor[] nodes;
+ private ErasureCodingPolicy ecPolicy;
+ private DistributedFileSystem fs;
+ private Path ecFile;
+ private int totalBlockGroups, blockGroupSize, stripesPerBlock, cellSize;
+ private LocatedStripedBlock locatedStripedBlock;
@Before
public void setup() throws Exception {
+ ecPolicy = SystemErasureCodingPolicies.getByID(
+ SystemErasureCodingPolicies.XOR_2_1_POLICY_ID);
conf = new HdfsConfiguration();
+ conf.set(DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_ENABLED_KEY,
+ ecPolicy.getName());
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_OF_DATANODES)
.build();
cluster.waitActive();
@@ -65,6 +80,25 @@ public class TestComputeInvalidateWork {
nodes = bm.getDatanodeManager().getHeartbeatManager().getDatanodes();
BlockManagerTestUtil.stopRedundancyThread(bm);
assertEquals(nodes.length, NUM_OF_DATANODES);
+
+ // Create a striped file
+ Path ecDir = new Path("/ec");
+ fs = cluster.getFileSystem();
+ fs.mkdirs(ecDir);
+ fs.getClient().setErasureCodingPolicy(ecDir.toString(), ecPolicy.getName());
+ ecFile = new Path(ecDir, "ec-file");
+ stripesPerBlock = 2;
+ cellSize = ecPolicy.getCellSize();
+ int blockSize = stripesPerBlock * cellSize;
+ blockGroupSize = ecPolicy.getNumDataUnits() * blockSize;
+ totalBlockGroups = 4;
+ DFSTestUtil.createStripedFile(cluster, ecFile, ecDir, totalBlockGroups,
+ stripesPerBlock, false, ecPolicy);
+ LocatedBlocks lbs = cluster.getFileSystem().getClient().
+ getNamenode().getBlockLocations(
+ ecFile.toString(), 0, blockGroupSize);
+ assert lbs.get(0) instanceof LocatedStripedBlock;
+ locatedStripedBlock = (LocatedStripedBlock)(lbs.get(0));
}
@After
@@ -75,12 +109,28 @@ public class TestComputeInvalidateWork {
}
}
+ private void verifyInvalidationWorkCounts(int blockInvalidateLimit) {
+ assertEquals(blockInvalidateLimit * NUM_OF_DATANODES,
+ bm.computeInvalidateWork(NUM_OF_DATANODES + 1));
+ assertEquals(blockInvalidateLimit * NUM_OF_DATANODES,
+ bm.computeInvalidateWork(NUM_OF_DATANODES));
+ assertEquals(blockInvalidateLimit * (NUM_OF_DATANODES - 1),
+ bm.computeInvalidateWork(NUM_OF_DATANODES - 1));
+ int workCount = bm.computeInvalidateWork(1);
+ if (workCount == 1) {
+ assertEquals(blockInvalidateLimit + 1, bm.computeInvalidateWork(2));
+ } else {
+ assertEquals(workCount, blockInvalidateLimit);
+ assertEquals(2, bm.computeInvalidateWork(2));
+ }
+ }
+
/**
* Test if {@link BlockManager#computeInvalidateWork(int)}
- * can schedule invalidate work correctly
+ * can schedule invalidate work correctly for the replicas.
*/
@Test(timeout=120000)
- public void testCompInvalidate() throws Exception {
+ public void testComputeInvalidateReplicas() throws Exception {
final int blockInvalidateLimit = bm.getDatanodeManager()
.getBlockInvalidateLimit();
namesystem.writeLock();
@@ -92,20 +142,66 @@ public class TestComputeInvalidateWork {
bm.addToInvalidates(block, nodes[i]);
}
}
-
- assertEquals(blockInvalidateLimit*NUM_OF_DATANODES,
- bm.computeInvalidateWork(NUM_OF_DATANODES+1));
- assertEquals(blockInvalidateLimit*NUM_OF_DATANODES,
- bm.computeInvalidateWork(NUM_OF_DATANODES));
- assertEquals(blockInvalidateLimit*(NUM_OF_DATANODES-1),
- bm.computeInvalidateWork(NUM_OF_DATANODES-1));
- int workCount = bm.computeInvalidateWork(1);
- if (workCount == 1) {
- assertEquals(blockInvalidateLimit+1, bm.computeInvalidateWork(2));
- } else {
- assertEquals(workCount, blockInvalidateLimit);
- assertEquals(2, bm.computeInvalidateWork(2));
+ verifyInvalidationWorkCounts(blockInvalidateLimit);
+ } finally {
+ namesystem.writeUnlock();
+ }
+ }
+
+ /**
+ * Test if {@link BlockManager#computeInvalidateWork(int)}
+ * can schedule invalidate work correctly for the striped block groups.
+ */
+ @Test(timeout=120000)
+ public void testComputeInvalidateStripedBlockGroups() throws Exception {
+ final int blockInvalidateLimit =
+ bm.getDatanodeManager().getBlockInvalidateLimit();
+ namesystem.writeLock();
+ try {
+ int nodeCount = ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits();
+ for (int i = 0; i < nodeCount; i++) {
+ for(int j = 0; j < 3 * blockInvalidateLimit + 1; j++) {
+ Block blk = new Block(locatedStripedBlock.getBlock().getBlockId() +
+ (i * 10 + j), stripesPerBlock * cellSize,
+ locatedStripedBlock.getBlock().getGenerationStamp());
+ bm.addToInvalidates(blk, nodes[i]);
+ }
+ }
+ verifyInvalidationWorkCounts(blockInvalidateLimit);
+ } finally {
+ namesystem.writeUnlock();
+ }
+ }
+
+ /**
+ * Test if {@link BlockManager#computeInvalidateWork(int)}
+ * can schedule invalidate work correctly for both replicas and striped
+ * block groups, combined.
+ */
+ @Test(timeout=120000)
+ public void testComputeInvalidate() throws Exception {
+ final int blockInvalidateLimit =
+ bm.getDatanodeManager().getBlockInvalidateLimit();
+ final Random random = new Random(System.currentTimeMillis());
+ namesystem.writeLock();
+ try {
+ int nodeCount = ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits();
+ for (int i = 0; i < nodeCount; i++) {
+ for(int j = 0; j < 3 * blockInvalidateLimit + 1; j++) {
+ if (random.nextBoolean()) {
+ Block stripedBlock = new Block(
+ locatedStripedBlock.getBlock().getBlockId() + (i * 10 + j),
+ stripesPerBlock * cellSize,
+ locatedStripedBlock.getBlock().getGenerationStamp());
+ bm.addToInvalidates(stripedBlock, nodes[i]);
+ } else {
+ Block replica = new Block(i * (blockInvalidateLimit + 1) + j, 0,
+ GenerationStamp.LAST_RESERVED_STAMP);
+ bm.addToInvalidates(replica, nodes[i]);
+ }
+ }
}
+ verifyInvalidationWorkCounts(blockInvalidateLimit);
} finally {
namesystem.writeUnlock();
}
@@ -129,6 +225,11 @@ public class TestComputeInvalidateWork {
Block block = new Block(0, 0, GenerationStamp.LAST_RESERVED_STAMP);
bm.addToInvalidates(block, nodes[0]);
+ Block stripedBlock = new Block(
+ locatedStripedBlock.getBlock().getBlockId() + 100,
+ stripesPerBlock * cellSize,
+ locatedStripedBlock.getBlock().getGenerationStamp());
+ bm.addToInvalidates(stripedBlock, nodes[0]);
bm.getDatanodeManager().registerDatanode(dnr);
// Since UUID has changed, the invalidation work should be skipped
@@ -145,26 +246,37 @@ public class TestComputeInvalidateWork {
final DistributedFileSystem dfs = cluster.getFileSystem();
final Path path = new Path("/testRR");
// Create a file and shutdown the DNs, which populates InvalidateBlocks
+ short totalReplicas = NUM_OF_DATANODES;
DFSTestUtil.createFile(dfs, path, dfs.getDefaultBlockSize(),
- (short) NUM_OF_DATANODES, 0xED0ED0);
+ totalReplicas, 0xED0ED0);
DFSTestUtil.waitForReplication(dfs, path, (short) NUM_OF_DATANODES, 12000);
for (DataNode dn : cluster.getDataNodes()) {
dn.shutdown();
}
dfs.delete(path, false);
+ dfs.delete(ecFile, false);
namesystem.writeLock();
InvalidateBlocks invalidateBlocks;
- int expected = NUM_OF_DATANODES;
+ int totalStripedDataBlocks = totalBlockGroups * (ecPolicy.getNumDataUnits()
+ + ecPolicy.getNumParityUnits());
+ int expected = totalReplicas + totalStripedDataBlocks;
try {
invalidateBlocks = (InvalidateBlocks) Whitebox
.getInternalState(cluster.getNamesystem().getBlockManager(),
"invalidateBlocks");
- assertEquals("Expected invalidate blocks to be the number of DNs",
+ assertEquals("Invalidate blocks should include both Replicas and " +
+ "Striped BlockGroups!",
(long) expected, invalidateBlocks.numBlocks());
+ assertEquals("Unexpected invalidate count for replicas!",
+ totalReplicas, invalidateBlocks.getBlocksStat());
+ assertEquals("Unexpected invalidate count for striped block groups!",
+ totalStripedDataBlocks, invalidateBlocks.getECBlockGroupsStat());
} finally {
namesystem.writeUnlock();
}
// Re-register each DN and see that it wipes the invalidation work
+ int totalBlockGroupsPerDataNode = totalBlockGroups;
+ int totalReplicasPerDataNode = totalReplicas / NUM_OF_DATANODES;
for (DataNode dn : cluster.getDataNodes()) {
DatanodeID did = dn.getDatanodeId();
DatanodeRegistration reg = new DatanodeRegistration(
@@ -175,7 +287,7 @@ public class TestComputeInvalidateWork {
namesystem.writeLock();
try {
bm.getDatanodeManager().registerDatanode(reg);
- expected--;
+ expected -= (totalReplicasPerDataNode + totalBlockGroupsPerDataNode);
assertEquals("Expected number of invalidate blocks to decrease",
(long) expected, invalidateBlocks.numBlocks());
} finally {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/999c8fcb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCorruptReplicaInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCorruptReplicaInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCorruptReplicaInfo.java
index 4bdaaac..3f8a5cd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCorruptReplicaInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCorruptReplicaInfo.java
@@ -25,14 +25,13 @@ import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockType;
import org.apache.hadoop.hdfs.server.blockmanagement.CorruptReplicasMap.Reason;
import org.junit.Test;
@@ -45,88 +44,130 @@ import org.junit.Test;
*/
public class TestCorruptReplicaInfo {
- private static final Log LOG =
- LogFactory.getLog(TestCorruptReplicaInfo.class);
-
- private final Map<Long, Block> block_map =
- new HashMap<Long, Block>();
-
- // Allow easy block creation by block id
- // Return existing block if one with same block id already exists
- private Block getBlock(Long block_id) {
- if (!block_map.containsKey(block_id)) {
- block_map.put(block_id, new Block(block_id,0,0));
+ private static final Log LOG = LogFactory.getLog(
+ TestCorruptReplicaInfo.class);
+ private final Map<Long, Block> replicaMap = new HashMap<>();
+ private final Map<Long, Block> stripedBlocksMap = new HashMap<>();
+
+ // Allow easy block creation by block id. Return existing
+ // replica block if one with same block id already exists.
+ private Block getReplica(Long blockId) {
+ if (!replicaMap.containsKey(blockId)) {
+ replicaMap.put(blockId, new Block(blockId, 0, 0));
}
-
- return block_map.get(block_id);
+ return replicaMap.get(blockId);
}
-
- private Block getBlock(int block_id) {
- return getBlock((long)block_id);
+
+ private Block getReplica(int blkId) {
+ return getReplica(Long.valueOf(blkId));
+ }
+
+ private Block getStripedBlock(int blkId) {
+ Long stripedBlockId = (1L << 63) + blkId;
+ assertTrue(BlockIdManager.isStripedBlockID(stripedBlockId));
+ if (!stripedBlocksMap.containsKey(stripedBlockId)) {
+ stripedBlocksMap.put(stripedBlockId, new Block(stripedBlockId, 1024, 0));
+ }
+ return stripedBlocksMap.get(stripedBlockId);
+ }
+
+ private void verifyCorruptBlocksCount(CorruptReplicasMap corruptReplicasMap,
+ long expectedReplicaCount, long expectedStripedBlockCount) {
+ long totalExpectedCorruptBlocks = expectedReplicaCount +
+ expectedStripedBlockCount;
+ assertEquals("Unexpected total corrupt blocks count!",
+ totalExpectedCorruptBlocks, corruptReplicasMap.size());
+ assertEquals("Unexpected replica blocks count!",
+ expectedReplicaCount, corruptReplicasMap.getCorruptBlocksStat());
+ assertEquals("Unexpected striped blocks count!",
+ expectedStripedBlockCount,
+ corruptReplicasMap.getCorruptECBlockGroupsStat());
}
@Test
- public void testCorruptReplicaInfo() throws IOException,
- InterruptedException {
-
- CorruptReplicasMap crm = new CorruptReplicasMap();
-
- // Make sure initial values are returned correctly
- assertEquals("Number of corrupt blocks must initially be 0", 0, crm.size());
- assertNull("Param n cannot be less than 0", crm.getCorruptReplicaBlockIdsForTesting(-1, null));
- assertNull("Param n cannot be greater than 100", crm.getCorruptReplicaBlockIdsForTesting(101, null));
- long[] l = crm.getCorruptReplicaBlockIdsForTesting(0, null);
- assertNotNull("n = 0 must return non-null", l);
- assertEquals("n = 0 must return an empty list", 0, l.length);
-
- // create a list of block_ids. A list is used to allow easy validation of the
- // output of getCorruptReplicaBlockIds
- int NUM_BLOCK_IDS = 140;
- List<Long> block_ids = new LinkedList<Long>();
- for (int i=0;i<NUM_BLOCK_IDS;i++) {
- block_ids.add((long)i);
- }
-
- DatanodeDescriptor dn1 = DFSTestUtil.getLocalDatanodeDescriptor();
- DatanodeDescriptor dn2 = DFSTestUtil.getLocalDatanodeDescriptor();
-
- addToCorruptReplicasMap(crm, getBlock(0), dn1);
- assertEquals("Number of corrupt blocks not returning correctly",
- 1, crm.size());
- addToCorruptReplicasMap(crm, getBlock(1), dn1);
- assertEquals("Number of corrupt blocks not returning correctly",
- 2, crm.size());
-
- addToCorruptReplicasMap(crm, getBlock(1), dn2);
- assertEquals("Number of corrupt blocks not returning correctly",
- 2, crm.size());
-
- crm.removeFromCorruptReplicasMap(getBlock(1));
- assertEquals("Number of corrupt blocks not returning correctly",
- 1, crm.size());
-
- crm.removeFromCorruptReplicasMap(getBlock(0));
- assertEquals("Number of corrupt blocks not returning correctly",
- 0, crm.size());
-
- for (Long block_id: block_ids) {
- addToCorruptReplicasMap(crm, getBlock(block_id), dn1);
- }
-
- assertEquals("Number of corrupt blocks not returning correctly",
- NUM_BLOCK_IDS, crm.size());
-
- assertTrue("First five block ids not returned correctly ",
- Arrays.equals(new long[]{0,1,2,3,4},
- crm.getCorruptReplicaBlockIdsForTesting(5, null)));
-
- LOG.info(crm.getCorruptReplicaBlockIdsForTesting(10, 7L));
- LOG.info(block_ids.subList(7, 18));
-
- assertTrue("10 blocks after 7 not returned correctly ",
- Arrays.equals(new long[]{8,9,10,11,12,13,14,15,16,17},
- crm.getCorruptReplicaBlockIdsForTesting(10, 7L)));
-
+ public void testCorruptReplicaInfo()
+ throws IOException, InterruptedException {
+ CorruptReplicasMap crm = new CorruptReplicasMap();
+
+ // Make sure initial values are returned correctly
+ assertEquals("Total number of corrupt blocks must initially be 0!",
+ 0, crm.size());
+ assertEquals("Number of corrupt replicas must initially be 0!",
+ 0, crm.getCorruptBlocksStat());
+ assertEquals("Number of corrupt striped block groups must initially be 0!",
+ 0, crm.getCorruptECBlockGroupsStat());
+ assertNull("Param n cannot be less than 0",
+ crm.getCorruptBlockIdsForTesting(BlockType.CONTIGUOUS, -1, null));
+ assertNull("Param n cannot be greater than 100",
+ crm.getCorruptBlockIdsForTesting(BlockType.CONTIGUOUS, 101, null));
+ long[] l = crm.getCorruptBlockIdsForTesting(BlockType.CONTIGUOUS, 0, null);
+ assertNotNull("n = 0 must return non-null", l);
+ assertEquals("n = 0 must return an empty list", 0, l.length);
+
+ // Create a list of block ids. A list is used to allow easy
+ // validation of the output of getCorruptReplicaBlockIds.
+ final int blockCount = 140;
+ long[] replicaIds = new long[blockCount];
+ long[] stripedIds = new long[blockCount];
+ for (int i = 0; i < blockCount; i++) {
+ replicaIds[i] = getReplica(i).getBlockId();
+ stripedIds[i] = getStripedBlock(i).getBlockId();
+ }
+
+ DatanodeDescriptor dn1 = DFSTestUtil.getLocalDatanodeDescriptor();
+ DatanodeDescriptor dn2 = DFSTestUtil.getLocalDatanodeDescriptor();
+
+ // Add to corrupt blocks map.
+ // Replicas
+ addToCorruptReplicasMap(crm, getReplica(0), dn1);
+ verifyCorruptBlocksCount(crm, 1, 0);
+ addToCorruptReplicasMap(crm, getReplica(1), dn1);
+ verifyCorruptBlocksCount(crm, 2, 0);
+ addToCorruptReplicasMap(crm, getReplica(1), dn2);
+ verifyCorruptBlocksCount(crm, 2, 0);
+
+ // Striped blocks
+ addToCorruptReplicasMap(crm, getStripedBlock(0), dn1);
+ verifyCorruptBlocksCount(crm, 2, 1);
+ addToCorruptReplicasMap(crm, getStripedBlock(1), dn1);
+ verifyCorruptBlocksCount(crm, 2, 2);
+ addToCorruptReplicasMap(crm, getStripedBlock(1), dn2);
+ verifyCorruptBlocksCount(crm, 2, 2);
+
+ // Remove from corrupt blocks map.
+ // Replicas
+ crm.removeFromCorruptReplicasMap(getReplica(1));
+ verifyCorruptBlocksCount(crm, 1, 2);
+ crm.removeFromCorruptReplicasMap(getReplica(0));
+ verifyCorruptBlocksCount(crm, 0, 2);
+
+ // Striped blocks
+ crm.removeFromCorruptReplicasMap(getStripedBlock(1));
+ verifyCorruptBlocksCount(crm, 0, 1);
+ crm.removeFromCorruptReplicasMap(getStripedBlock(0));
+ verifyCorruptBlocksCount(crm, 0, 0);
+
+ for (int blockId = 0; blockId < blockCount; blockId++) {
+ addToCorruptReplicasMap(crm, getReplica(blockId), dn1);
+ addToCorruptReplicasMap(crm, getStripedBlock(blockId), dn1);
+ }
+
+ assertEquals("Number of corrupt blocks not returning correctly",
+ 2 * blockCount, crm.size());
+ assertTrue("First five corrupt replica blocks ids are not right!",
+ Arrays.equals(Arrays.copyOfRange(replicaIds, 0, 5),
+ crm.getCorruptBlockIdsForTesting(BlockType.CONTIGUOUS, 5, null)));
+ assertTrue("First five corrupt striped blocks ids are not right!",
+ Arrays.equals(Arrays.copyOfRange(stripedIds, 0, 5),
+ crm.getCorruptBlockIdsForTesting(BlockType.STRIPED, 5, null)));
+
+ assertTrue("10 replica blocks after 7 not returned correctly!",
+ Arrays.equals(Arrays.copyOfRange(replicaIds, 7, 17),
+ crm.getCorruptBlockIdsForTesting(BlockType.CONTIGUOUS, 10, 7L)));
+ assertTrue("10 striped blocks after 7 not returned correctly!",
+ Arrays.equals(Arrays.copyOfRange(stripedIds, 7, 17),
+ crm.getCorruptBlockIdsForTesting(BlockType.STRIPED,
+ 10, getStripedBlock(7).getBlockId())));
}
private static void addToCorruptReplicasMap(CorruptReplicasMap crm,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/999c8fcb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestLowRedundancyBlockQueues.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestLowRedundancyBlockQueues.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestLowRedundancyBlockQueues.java
index d853762..c65fc64 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestLowRedundancyBlockQueues.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestLowRedundancyBlockQueues.java
@@ -45,9 +45,32 @@ public class TestLowRedundancyBlockQueues {
return sblk;
}
+ private void verifyBlockStats(LowRedundancyBlocks queues,
+ int lowRedundancyReplicaCount, int corruptReplicaCount,
+ int corruptReplicationOneCount, int lowRedundancyStripedCount,
+ int corruptStripedCount) {
+ assertEquals("Low redundancy replica count incorrect!",
+ lowRedundancyReplicaCount, queues.getLowRedundancyBlocksStat());
+ assertEquals("Corrupt replica count incorrect!",
+ corruptReplicaCount, queues.getCorruptBlocksStat());
+ assertEquals("Corrupt replica one count incorrect!",
+ corruptReplicationOneCount,
+ queues.getCorruptReplicationOneBlocksStat());
+ assertEquals("Low redundancy striped blocks count incorrect!",
+ lowRedundancyStripedCount, queues.getLowRedundancyECBlockGroupsStat());
+ assertEquals("Corrupt striped blocks count incorrect!",
+ corruptStripedCount, queues.getCorruptECBlockGroupsStat());
+ assertEquals("Low Redundancy count incorrect!",
+ lowRedundancyReplicaCount + lowRedundancyStripedCount,
+ queues.getLowRedundancyBlockCount());
+ assertEquals("LowRedundancyBlocks queue size incorrect!",
+ (lowRedundancyReplicaCount + corruptReplicaCount +
+ lowRedundancyStripedCount + corruptStripedCount), queues.size());
+ }
+
/**
* Test that adding blocks with different replication counts puts them
- * into different queues
+ * into different queues.
* @throws Throwable if something goes wrong
*/
@Test
@@ -59,43 +82,45 @@ public class TestLowRedundancyBlockQueues {
BlockInfo block_corrupt = genBlockInfo(4);
BlockInfo block_corrupt_repl_one = genBlockInfo(5);
- //add a block with a single entry
+ // Add a block with a single entry
assertAdded(queues, block1, 1, 0, 3);
-
- assertEquals(1, queues.getLowRedundancyBlockCount());
- assertEquals(1, queues.size());
assertInLevel(queues, block1, LowRedundancyBlocks.QUEUE_HIGHEST_PRIORITY);
- //repeated additions fail
+ verifyBlockStats(queues, 1, 0, 0, 0, 0);
+
+ // Repeated additions fail
assertFalse(queues.add(block1, 1, 0, 0, 3));
+ verifyBlockStats(queues, 1, 0, 0, 0, 0);
- //add a second block with two replicas
+ // Add a second block with two replicas
assertAdded(queues, block2, 2, 0, 3);
- assertEquals(2, queues.getLowRedundancyBlockCount());
- assertEquals(2, queues.size());
assertInLevel(queues, block2, LowRedundancyBlocks.QUEUE_LOW_REDUNDANCY);
- //now try to add a block that is corrupt
+ verifyBlockStats(queues, 2, 0, 0, 0, 0);
+
+ // Now try to add a block that is corrupt
assertAdded(queues, block_corrupt, 0, 0, 3);
- assertEquals(3, queues.size());
- assertEquals(2, queues.getLowRedundancyBlockCount());
- assertEquals(1, queues.getCorruptBlockSize());
assertInLevel(queues, block_corrupt,
LowRedundancyBlocks.QUEUE_WITH_CORRUPT_BLOCKS);
+ verifyBlockStats(queues, 2, 1, 0, 0, 0);
- //insert a very insufficiently redundancy block
+ // Insert a very insufficiently redundancy block
assertAdded(queues, block_very_low_redundancy, 4, 0, 25);
assertInLevel(queues, block_very_low_redundancy,
LowRedundancyBlocks.QUEUE_VERY_LOW_REDUNDANCY);
+ verifyBlockStats(queues, 3, 1, 0, 0, 0);
- //insert a corrupt block with replication factor 1
+ // Insert a corrupt block with replication factor 1
assertAdded(queues, block_corrupt_repl_one, 0, 0, 1);
- assertEquals(2, queues.getCorruptBlockSize());
- assertEquals(1, queues.getCorruptReplOneBlockSize());
+ verifyBlockStats(queues, 3, 2, 1, 0, 0);
+
+ // Bump up the expected count for corrupt replica one block from 1 to 3
queues.update(block_corrupt_repl_one, 0, 0, 0, 3, 0, 2);
- assertEquals(0, queues.getCorruptReplOneBlockSize());
+ verifyBlockStats(queues, 3, 2, 0, 0, 0);
+
+ // Reduce the expected replicas to 1
queues.update(block_corrupt, 0, 0, 0, 1, 0, -2);
- assertEquals(1, queues.getCorruptReplOneBlockSize());
+ verifyBlockStats(queues, 3, 2, 1, 0, 0);
queues.update(block_very_low_redundancy, 0, 0, 0, 1, -4, -24);
- assertEquals(2, queues.getCorruptReplOneBlockSize());
+ verifyBlockStats(queues, 2, 3, 2, 0, 0);
}
@Test
@@ -131,16 +156,18 @@ public class TestLowRedundancyBlockQueues {
assertInLevel(queues, block,
LowRedundancyBlocks.QUEUE_LOW_REDUNDANCY);
}
+ verifyBlockStats(queues, 0, 0, 0, numUR, 0);
}
// add a corrupted block
BlockInfo block_corrupt = genStripedBlockInfo(-10, numBytes);
assertEquals(numCorrupt, queues.getCorruptBlockSize());
+ verifyBlockStats(queues, 0, 0, 0, numUR, numCorrupt);
+
assertAdded(queues, block_corrupt, dataBlkNum - 1, 0, groupSize);
numCorrupt++;
- assertEquals(numUR + numCorrupt, queues.size());
- assertEquals(numUR, queues.getLowRedundancyBlockCount());
- assertEquals(numCorrupt, queues.getCorruptBlockSize());
+ verifyBlockStats(queues, 0, 0, 0, numUR, numCorrupt);
+
assertInLevel(queues, block_corrupt,
LowRedundancyBlocks.QUEUE_WITH_CORRUPT_BLOCKS);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/999c8fcb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlocks.java
index c0b54b0..e21d44e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlocks.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlocks.java
@@ -37,38 +37,51 @@ import java.util.Iterator;
public class TestUnderReplicatedBlocks {
- @Test(timeout=60000) // 1 min timeout
- public void testSetrepIncWithUnderReplicatedBlocks() throws Exception {
+ @Test(timeout=120000) // 1 min timeout
+ public void testSetRepIncWithUnderReplicatedBlocks() throws Exception {
Configuration conf = new HdfsConfiguration();
final short REPLICATION_FACTOR = 2;
final String FILE_NAME = "/testFile";
final Path FILE_PATH = new Path(FILE_NAME);
- MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(REPLICATION_FACTOR + 1).build();
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).
+ numDataNodes(REPLICATION_FACTOR + 1).build();
try {
// create a file with one block with a replication factor of 2
final FileSystem fs = cluster.getFileSystem();
+ final BlockManager bm = cluster.getNamesystem().getBlockManager();
DFSTestUtil.createFile(fs, FILE_PATH, 1L, REPLICATION_FACTOR, 1L);
DFSTestUtil.waitReplication(fs, FILE_PATH, REPLICATION_FACTOR);
-
+ BlockManagerTestUtil.updateState(bm);
+ DFSTestUtil.verifyClientStats(conf, cluster);
+
// remove one replica from the blocksMap so block becomes under-replicated
// but the block does not get put into the under-replicated blocks queue
- final BlockManager bm = cluster.getNamesystem().getBlockManager();
ExtendedBlock b = DFSTestUtil.getFirstBlock(fs, FILE_PATH);
DatanodeDescriptor dn = bm.blocksMap.getStorages(b.getLocalBlock())
.iterator().next().getDatanodeDescriptor();
bm.addToInvalidates(b.getLocalBlock(), dn);
+
+
// Compute the invalidate work in NN, and trigger the heartbeat from DN
BlockManagerTestUtil.computeAllPendingWork(bm);
DataNodeTestUtils.triggerHeartbeat(cluster.getDataNode(dn.getIpcPort()));
// Wait to make sure the DataNode receives the deletion request
Thread.sleep(5000);
+ BlockManagerTestUtil.updateState(bm);
+ DFSTestUtil.verifyClientStats(conf, cluster);
+
// Remove the record from blocksMap
bm.blocksMap.removeNode(b.getLocalBlock(), dn);
-
+ BlockManagerTestUtil.updateState(bm);
+ DFSTestUtil.verifyClientStats(conf, cluster);
+
// increment this file's replication factor
FsShell shell = new FsShell(conf);
- assertEquals(0, shell.run(new String[]{
- "-setrep", "-w", Integer.toString(1+REPLICATION_FACTOR), FILE_NAME}));
+ assertEquals(0, shell.run(new String[] {
+ "-setrep", "-w", Integer.toString(1 + REPLICATION_FACTOR),
+ FILE_NAME }));
+ BlockManagerTestUtil.updateState(bm);
+ DFSTestUtil.verifyClientStats(conf, cluster);
} finally {
cluster.shutdown();
}
@@ -126,25 +139,30 @@ public class TestUnderReplicatedBlocks {
final BlockManager bm = cluster.getNamesystem().getBlockManager();
ExtendedBlock b = DFSTestUtil.getFirstBlock(fs, FILE_PATH);
Iterator<DatanodeStorageInfo> storageInfos =
- bm.blocksMap.getStorages(b.getLocalBlock())
- .iterator();
+ bm.blocksMap.getStorages(b.getLocalBlock()).iterator();
DatanodeDescriptor firstDn = storageInfos.next().getDatanodeDescriptor();
DatanodeDescriptor secondDn = storageInfos.next().getDatanodeDescriptor();
- bm.getDatanodeManager().removeDatanode(firstDn);
+ BlockManagerTestUtil.updateState(bm);
+ DFSTestUtil.verifyClientStats(conf, cluster);
+ bm.getDatanodeManager().removeDatanode(firstDn);
+ BlockManagerTestUtil.updateState(bm);
assertEquals(NUM_OF_BLOCKS, bm.getUnderReplicatedNotMissingBlocks());
- bm.computeDatanodeWork();
+ DFSTestUtil.verifyClientStats(conf, cluster);
+ bm.computeDatanodeWork();
assertTrue("The number of replication work pending before targets are " +
"determined should be non-negative.",
(Integer)Whitebox.getInternalState(secondDn,
"pendingReplicationWithoutTargets") >= 0);
+ BlockManagerTestUtil.updateState(bm);
assertTrue("The number of blocks to be replicated should be less than "
+ "or equal to " + bm.replicationStreamsHardLimit,
secondDn.getNumberOfBlocksToBeReplicated()
<= bm.replicationStreamsHardLimit);
+ DFSTestUtil.verifyClientStats(conf, cluster);
} finally {
cluster.shutdown();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/999c8fcb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestReadOnlySharedStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestReadOnlySharedStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestReadOnlySharedStorage.java
index 90eb7d1..22cba6d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestReadOnlySharedStorage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestReadOnlySharedStorage.java
@@ -199,7 +199,7 @@ public class TestReadOnlySharedStorage {
assertThat(numberReplicas.replicasOnStaleNodes(), is(0));
BlockManagerTestUtil.updateState(blockManager);
- assertThat(blockManager.getUnderReplicatedBlocksCount(), is(0L));
+ assertThat(blockManager.getLowRedundancyBlocksCount(), is(0L));
assertThat(blockManager.getExcessBlocksCount(), is(0L));
}
@@ -238,7 +238,7 @@ public class TestReadOnlySharedStorage {
// The block should be reported as under-replicated
BlockManagerTestUtil.updateState(blockManager);
- assertThat(blockManager.getUnderReplicatedBlocksCount(), is(1L));
+ assertThat(blockManager.getLowRedundancyBlocksCount(), is(1L));
// The BlockManager should be able to heal the replication count back to 1
// by triggering an inter-datanode replication from one of the READ_ONLY_SHARED replicas
http://git-wip-us.apache.org/repos/asf/hadoop/blob/999c8fcb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
index d268d01..71a9f6f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
@@ -118,7 +118,7 @@ public class TestLazyPersistFiles extends LazyPersistTestCase {
assertThat(cluster.getNameNode()
.getNamesystem()
.getBlockManager()
- .getUnderReplicatedBlocksCount(),
+ .getLowRedundancyBlocksCount(),
is(0L));
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/999c8fcb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java
index 555c2fa..c556699 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java
@@ -424,7 +424,9 @@ public class TestAddStripedBlocks {
cluster.getDataNodes().get(3).getDatanodeId(), reports[0]);
BlockManagerTestUtil.updateState(ns.getBlockManager());
// the total number of corrupted block info is still 1
+ Assert.assertEquals(1, ns.getCorruptECBlockGroupsStat());
Assert.assertEquals(1, ns.getCorruptReplicaBlocks());
+ Assert.assertEquals(0, ns.getCorruptBlocksStat());
// 2 internal blocks corrupted
Assert.assertEquals(2, bm.getCorruptReplicas(stored).size());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/999c8fcb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java
index 3cf025c..11d7431 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java
@@ -410,7 +410,7 @@ public class TestDecommissioningStatus {
// All nodes are dead and decommed. Blocks should be missing.
long missingBlocks = bm.getMissingBlocksCount();
- long underreplicated = bm.getUnderReplicatedBlocksCount();
+ long underreplicated = bm.getLowRedundancyBlocksCount();
assertTrue(missingBlocks > 0);
assertTrue(underreplicated > 0);
@@ -440,7 +440,7 @@ public class TestDecommissioningStatus {
// Blocks should be still be under-replicated
Thread.sleep(2000); // Let replication monitor run
- assertEquals(underreplicated, bm.getUnderReplicatedBlocksCount());
+ assertEquals(underreplicated, bm.getLowRedundancyBlocksCount());
// Start up a node.
LOG.info("Starting two more nodes");
@@ -448,13 +448,13 @@ public class TestDecommissioningStatus {
cluster.waitActive();
// Replication should fix it.
int count = 0;
- while((bm.getUnderReplicatedBlocksCount() > 0 ||
+ while((bm.getLowRedundancyBlocksCount() > 0 ||
bm.getPendingReconstructionBlocksCount() > 0) &&
count++ < 10) {
Thread.sleep(1000);
}
- assertEquals(0, bm.getUnderReplicatedBlocksCount());
+ assertEquals(0, bm.getLowRedundancyBlocksCount());
assertEquals(0, bm.getPendingReconstructionBlocksCount());
assertEquals(0, bm.getMissingBlocksCount());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/999c8fcb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeMXBean.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeMXBean.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeMXBean.java
index ed9ed3a..32c2a49 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeMXBean.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeMXBean.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.CombinedHostFileManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
@@ -774,17 +775,24 @@ public class TestNameNodeMXBean {
}
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- ObjectName mxbeanName = new ObjectName(
+ ObjectName replStateMBeanName = new ObjectName(
+ "Hadoop:service=NameNode,name=ReplicatedBlocksState");
+ ObjectName ecBlkGrpStateMBeanName = new ObjectName(
+ "Hadoop:service=NameNode,name=ECBlockGroupsState");
+ ObjectName namenodeMXBeanName = new ObjectName(
"Hadoop:service=NameNode,name=NameNodeInfo");
// Wait for the metrics to discover the unrecoverable block group
+ long expectedMissingBlockCount = 1L;
+ long expectedCorruptBlockCount = 1L;
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
try {
Long numMissingBlocks =
- (Long) mbs.getAttribute(mxbeanName, "NumberOfMissingBlocks");
- if (numMissingBlocks == 1L) {
+ (Long) mbs.getAttribute(namenodeMXBeanName,
+ "NumberOfMissingBlocks");
+ if (numMissingBlocks == expectedMissingBlockCount) {
return true;
}
} catch (Exception e) {
@@ -794,7 +802,43 @@ public class TestNameNodeMXBean {
}
}, 1000, 60000);
- String corruptFiles = (String) (mbs.getAttribute(mxbeanName,
+ BlockManagerTestUtil.updateState(
+ cluster.getNamesystem().getBlockManager());
+
+ // Verification of missing blocks
+ long totalMissingBlocks = cluster.getNamesystem().getMissingBlocksCount();
+ Long replicaMissingBlocks =
+ (Long) mbs.getAttribute(replStateMBeanName,
+ "MissingBlocksStat");
+ Long ecMissingBlocks =
+ (Long) mbs.getAttribute(ecBlkGrpStateMBeanName,
+ "MissingECBlockGroupsStat");
+ assertEquals("Unexpected total missing blocks!",
+ expectedMissingBlockCount, totalMissingBlocks);
+ assertEquals("Unexpected total missing blocks!",
+ totalMissingBlocks,
+ (replicaMissingBlocks + ecMissingBlocks));
+ assertEquals("Unexpected total ec missing blocks!",
+ expectedMissingBlockCount, ecMissingBlocks.longValue());
+
+ // Verification of corrupt blocks
+ long totalCorruptBlocks =
+ cluster.getNamesystem().getCorruptReplicaBlocks();
+ Long replicaCorruptBlocks =
+ (Long) mbs.getAttribute(replStateMBeanName,
+ "CorruptBlocksStat");
+ Long ecCorruptBlocks =
+ (Long) mbs.getAttribute(ecBlkGrpStateMBeanName,
+ "CorruptECBlockGroupsStat");
+ assertEquals("Unexpected total corrupt blocks!",
+ expectedCorruptBlockCount, totalCorruptBlocks);
+ assertEquals("Unexpected total corrupt blocks!",
+ totalCorruptBlocks,
+ (replicaCorruptBlocks + ecCorruptBlocks));
+ assertEquals("Unexpected total ec corrupt blocks!",
+ expectedCorruptBlockCount, ecCorruptBlocks.longValue());
+
+ String corruptFiles = (String) (mbs.getAttribute(namenodeMXBeanName,
"CorruptFiles"));
int numCorruptFiles = ((Object[]) JSON.parse(corruptFiles)).length;
assertEquals(1, numCorruptFiles);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/999c8fcb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReconstructStripedBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReconstructStripedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReconstructStripedBlocks.java
index 34fec5b..540ae63 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReconstructStripedBlocks.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReconstructStripedBlocks.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
+import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
@@ -50,6 +51,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.BitSet;
+import java.util.Iterator;
import java.util.List;
import static org.junit.Assert.assertEquals;
@@ -157,6 +159,8 @@ public class TestReconstructStripedBlocks {
assertEquals(numBlocks, missedNode.numBlocks());
bm.getDatanodeManager().removeDatanode(missedNode);
}
+ BlockManagerTestUtil.updateState(bm);
+ DFSTestUtil.verifyClientStats(conf, cluster);
BlockManagerTestUtil.getComputedDatanodeWork(bm);
@@ -185,6 +189,8 @@ public class TestReconstructStripedBlocks {
info.getSourceDnInfos().length);
}
}
+ BlockManagerTestUtil.updateState(bm);
+ DFSTestUtil.verifyClientStats(conf, cluster);
} finally {
cluster.shutdown();
}
@@ -212,6 +218,8 @@ public class TestReconstructStripedBlocks {
final byte[] data = new byte[fileLen];
DFSTestUtil.writeFile(fs, p, data);
DFSTestUtil.waitForReplication(fs, p, groupSize, 5000);
+ BlockManagerTestUtil.updateState(bm);
+ DFSTestUtil.verifyClientStats(conf, cluster);
LocatedStripedBlock lb = (LocatedStripedBlock)fs.getClient()
.getLocatedBlocks(p.toString(), 0).get(0);
@@ -219,16 +227,20 @@ public class TestReconstructStripedBlocks {
cellSize, dataBlocks, parityBlocks);
BlockManagerTestUtil.getComputedDatanodeWork(bm);
+ BlockManagerTestUtil.updateState(bm);
assertEquals(0, getNumberOfBlocksToBeErasureCoded(cluster));
assertEquals(0, bm.getPendingReconstructionBlocksCount());
+ DFSTestUtil.verifyClientStats(conf, cluster);
// missing 1 block, so 1 task should be scheduled
DatanodeInfo dn0 = lbs[0].getLocations()[0];
cluster.stopDataNode(dn0.getName());
cluster.setDataNodeDead(dn0);
BlockManagerTestUtil.getComputedDatanodeWork(bm);
+ BlockManagerTestUtil.updateState(bm);
assertEquals(1, getNumberOfBlocksToBeErasureCoded(cluster));
assertEquals(1, bm.getPendingReconstructionBlocksCount());
+ DFSTestUtil.verifyClientStats(conf, cluster);
// missing another block, but no new task should be scheduled because
// previous task isn't finished.
@@ -236,8 +248,10 @@ public class TestReconstructStripedBlocks {
cluster.stopDataNode(dn1.getName());
cluster.setDataNodeDead(dn1);
BlockManagerTestUtil.getComputedDatanodeWork(bm);
+ BlockManagerTestUtil.updateState(bm);
assertEquals(1, getNumberOfBlocksToBeErasureCoded(cluster));
assertEquals(1, bm.getPendingReconstructionBlocksCount());
+ DFSTestUtil.verifyClientStats(conf, cluster);
} finally {
cluster.shutdown();
}
@@ -294,6 +308,7 @@ public class TestReconstructStripedBlocks {
// bring the dn back: 10 internal blocks now
cluster.restartDataNode(dnProp);
cluster.waitActive();
+ DFSTestUtil.verifyClientStats(conf, cluster);
// stop another dn: 9 internal blocks, but only cover 8 real one
dnToStop = block.getLocations()[1];
@@ -352,4 +367,72 @@ public class TestReconstructStripedBlocks {
cluster.shutdown();
}
}
+
+ @Test(timeout=120000) // 1 min timeout
+ public void testReconstructionWork() throws Exception {
+ Configuration conf = new HdfsConfiguration();
+ conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0);
+ conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1);
+ conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 1);
+ conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1000);
+ conf.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY,
+ 1000);
+ conf.setInt(
+ DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION,
+ 5);
+
+ ErasureCodingPolicy policy = SystemErasureCodingPolicies.getByID(
+ SystemErasureCodingPolicies.XOR_2_1_POLICY_ID);
+ conf.setStrings(DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_ENABLED_KEY,
+ policy.getName());
+ Path ecDir = new Path("/ec");
+ Path ecFilePath = new Path(ecDir, "ec-file");
+ int blockGroups = 2;
+ int totalDataNodes = policy.getNumDataUnits() +
+ policy.getNumParityUnits() + 1;
+
+ MiniDFSCluster dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(
+ totalDataNodes).build();
+ try {
+ // create an EC file with 2 block groups
+ final DistributedFileSystem fs = dfsCluster.getFileSystem();
+ fs.mkdirs(ecDir);
+ fs.setErasureCodingPolicy(ecDir, policy.getName());
+ DFSTestUtil.createStripedFile(dfsCluster, ecFilePath, ecDir,
+ blockGroups, 2, false, policy);
+
+ final BlockManager bm = dfsCluster.getNamesystem().getBlockManager();
+ LocatedBlocks lbs = fs.getClient().getNamenode().getBlockLocations(
+ ecFilePath.toString(), 0, blockGroups);
+ assert lbs.get(0) instanceof LocatedStripedBlock;
+ LocatedStripedBlock bg = (LocatedStripedBlock) (lbs.get(0));
+
+ Iterator<DatanodeStorageInfo> storageInfos =
+ bm.getStorages(bg.getBlock().getLocalBlock()).iterator();
+ DatanodeDescriptor firstDn = storageInfos.next().getDatanodeDescriptor();
+
+ BlockManagerTestUtil.updateState(bm);
+ DFSTestUtil.verifyClientStats(conf, dfsCluster);
+
+ // Remove one of the DataUnit nodes
+ bm.getDatanodeManager().removeDatanode(firstDn);
+
+ // Verify low redundancy count matching EC block groups count
+ BlockManagerTestUtil.updateState(bm);
+ assertEquals(blockGroups, bm.getLowRedundancyECBlockGroupsStat());
+ DFSTestUtil.verifyClientStats(conf, dfsCluster);
+
+
+ // Trigger block group reconstruction
+ BlockManagerTestUtil.getComputedDatanodeWork(bm);
+ BlockManagerTestUtil.updateState(bm);
+
+ // Verify pending reconstruction count
+ assertEquals(blockGroups, getNumberOfBlocksToBeErasureCoded(dfsCluster));
+ assertEquals(0, bm.getLowRedundancyECBlockGroupsStat());
+ DFSTestUtil.verifyClientStats(conf, dfsCluster);
+ } finally {
+ dfsCluster.shutdown();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/999c8fcb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java
index 4ad742e..c84f8e8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.fs.FileSystemTestWrapper;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.StripedFileTestUtil;
import org.apache.hadoop.hdfs.client.CreateEncryptionZoneFlag;
import org.apache.hadoop.hdfs.client.HdfsAdmin;
@@ -57,8 +58,12 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
+import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
@@ -90,17 +95,23 @@ public class TestNameNodeMetrics {
new Path("/testNameNodeMetrics");
private static final String NN_METRICS = "NameNodeActivity";
private static final String NS_METRICS = "FSNamesystem";
+ private static final int BLOCK_SIZE = 1024 * 1024;
+ private static final ErasureCodingPolicy EC_POLICY =
+ SystemErasureCodingPolicies.getByID(
+ SystemErasureCodingPolicies.XOR_2_1_POLICY_ID);
+
public static final Log LOG = LogFactory.getLog(TestNameNodeMetrics.class);
// Number of datanodes in the cluster
- private static final int DATANODE_COUNT = 3;
+ private static final int DATANODE_COUNT = EC_POLICY.getNumDataUnits() +
+ EC_POLICY.getNumParityUnits() + 1;
private static final int WAIT_GAUGE_VALUE_RETRIES = 20;
// Rollover interval of percentile metrics (in seconds)
private static final int PERCENTILES_INTERVAL = 1;
static {
- CONF.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 100);
+ CONF.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
CONF.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 1);
CONF.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
DFS_REDUNDANCY_INTERVAL);
@@ -109,7 +120,11 @@ public class TestNameNodeMetrics {
CONF.set(DFSConfigKeys.DFS_METRICS_PERCENTILES_INTERVALS_KEY,
"" + PERCENTILES_INTERVAL);
// Enable stale DataNodes checking
- CONF.setBoolean(DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY, true);
+ CONF.setBoolean(
+ DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY, true);
+ // Enable erasure coding
+ CONF.set(DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_ENABLED_KEY,
+ EC_POLICY.getName());
GenericTestUtils.setLogLevel(LogFactory.getLog(MetricsAsserts.class),
Level.DEBUG);
}
@@ -119,18 +134,23 @@ public class TestNameNodeMetrics {
private final Random rand = new Random();
private FSNamesystem namesystem;
private BlockManager bm;
+ private Path ecDir;
private static Path getTestPath(String fileName) {
return new Path(TEST_ROOT_DIR_PATH, fileName);
}
-
+
@Before
public void setUp() throws Exception {
- cluster = new MiniDFSCluster.Builder(CONF).numDataNodes(DATANODE_COUNT).build();
+ cluster = new MiniDFSCluster.Builder(CONF).numDataNodes(DATANODE_COUNT)
+ .build();
cluster.waitActive();
namesystem = cluster.getNamesystem();
bm = namesystem.getBlockManager();
fs = cluster.getFileSystem();
+ ecDir = getTestPath("/ec");
+ fs.mkdirs(ecDir);
+ fs.setErasureCodingPolicy(ecDir, EC_POLICY.getName());
}
@After
@@ -219,49 +239,125 @@ public class TestNameNodeMetrics {
/** Test metrics associated with addition of a file */
@Test
public void testFileAdd() throws Exception {
- // Add files with 100 blocks
- final Path file = getTestPath("testFileAdd");
- createFile(file, 3200, (short)3);
+ // File creations
final long blockCount = 32;
+ final Path normalFile = getTestPath("testFileAdd");
+ createFile(normalFile, blockCount * BLOCK_SIZE, (short)3);
+ final Path ecFile = new Path(ecDir, "ecFile.log");
+ DFSTestUtil.createStripedFile(cluster, ecFile, null, (int) blockCount, 1,
+ false, EC_POLICY);
+
int blockCapacity = namesystem.getBlockCapacity();
assertGauge("BlockCapacity", blockCapacity, getMetrics(NS_METRICS));
MetricsRecordBuilder rb = getMetrics(NN_METRICS);
- // File create operations is 1
- // Number of files created is depth of <code>file</code> path
- assertCounter("CreateFileOps", 1L, rb);
- assertCounter("FilesCreated", (long)file.depth(), rb);
-
- long filesTotal = file.depth() + 1; // Add 1 for root
+ // File create operations are 2
+ assertCounter("CreateFileOps", 2L, rb);
+ // Number of files created is depth of normalFile and ecFile, after
+ // removing the duplicate accounting for root test dir.
+ assertCounter("FilesCreated",
+ (long)(normalFile.depth() + ecFile.depth()), rb);
+
+ long filesTotal = normalFile.depth() + ecFile.depth() + 1 /* ecDir */;
rb = getMetrics(NS_METRICS);
assertGauge("FilesTotal", filesTotal, rb);
- assertGauge("BlocksTotal", blockCount, rb);
- fs.delete(file, true);
+ assertGauge("BlocksTotal", blockCount * 2, rb);
+ fs.delete(normalFile, true);
filesTotal--; // reduce the filecount for deleted file
rb = waitForDnMetricValue(NS_METRICS, "FilesTotal", filesTotal);
+ assertGauge("BlocksTotal", blockCount, rb);
+ assertGauge("PendingDeletionBlocks", 0L, rb);
+
+ fs.delete(ecFile, true);
+ filesTotal--;
+ rb = waitForDnMetricValue(NS_METRICS, "FilesTotal", filesTotal);
assertGauge("BlocksTotal", 0L, rb);
assertGauge("PendingDeletionBlocks", 0L, rb);
rb = getMetrics(NN_METRICS);
// Delete file operations and number of files deleted must be 1
- assertCounter("DeleteFileOps", 1L, rb);
- assertCounter("FilesDeleted", 1L, rb);
+ assertCounter("DeleteFileOps", 2L, rb);
+ assertCounter("FilesDeleted", 2L, rb);
}
-
+
+ /**
+ * Verify low redundancy and corrupt blocks metrics are zero.
+ * @throws Exception
+ */
+ private void verifyZeroMetrics() throws Exception {
+ BlockManagerTestUtil.updateState(bm);
+ MetricsRecordBuilder rb = waitForDnMetricValue(NS_METRICS,
+ "CorruptBlocks", 0L, 500);
+
+ // Verify aggregated blocks metrics
+ assertGauge("UnderReplicatedBlocks", 0L, rb); // Deprecated metric
+ assertGauge("LowRedundancyBlocks", 0L, rb);
+ assertGauge("PendingReplicationBlocks", 0L, rb); // Deprecated metric
+ assertGauge("PendingReconstructionBlocks", 0L, rb);
+
+ // Verify replica metrics
+ assertGauge("LowRedundancyReplicatedBlocks", 0L, rb);
+ assertGauge("CorruptReplicatedBlocks", 0L, rb);
+
+ // Verify striped block groups metrics
+ assertGauge("LowRedundancyECBlockGroups", 0L, rb);
+ assertGauge("CorruptECBlockGroups", 0L, rb);
+ }
+
+ /**
+ * Verify aggregated metrics equals the sum of replicated blocks metrics
+ * and erasure coded blocks metrics.
+ * @throws Exception
+ */
+ private void verifyAggregatedMetricsTally() throws Exception {
+ BlockManagerTestUtil.updateState(bm);
+ assertEquals("Under replicated metrics not matching!",
+ namesystem.getLowRedundancyBlocks(),
+ namesystem.getUnderReplicatedBlocks());
+ assertEquals("Low redundancy metrics not matching!",
+ namesystem.getLowRedundancyBlocks(),
+ namesystem.getLowRedundancyBlocksStat() +
+ namesystem.getLowRedundancyECBlockGroupsStat());
+ assertEquals("Corrupt blocks metrics not matching!",
+ namesystem.getCorruptReplicaBlocks(),
+ namesystem.getCorruptBlocksStat() +
+ namesystem.getCorruptECBlockGroupsStat());
+ assertEquals("Missing blocks metrics not matching!",
+ namesystem.getMissingBlocksCount(),
+ namesystem.getMissingBlocksStat() +
+ namesystem.getMissingECBlockGroupsStat());
+ assertEquals("Missing blocks with replication factor one not matching!",
+ namesystem.getMissingReplOneBlocksCount(),
+ namesystem.getMissingReplicationOneBlocksStat());
+ assertEquals("Bytes in future blocks metrics not matching!",
+ namesystem.getBytesInFuture(),
+ namesystem.getBlocksBytesInFutureStat() +
+ namesystem.getECBlocksBytesInFutureStat());
+ assertEquals("Pending deletion blocks metrics not matching!",
+ namesystem.getPendingDeletionBlocks(),
+ namesystem.getPendingDeletionBlocksStat() +
+ namesystem.getPendingDeletionECBlockGroupsStat());
+ }
+
/** Corrupt a block and ensure metrics reflects it */
@Test
public void testCorruptBlock() throws Exception {
// Create a file with single block with two replicas
final Path file = getTestPath("testCorruptBlock");
- createFile(file, 100, (short)2);
-
+ final short replicaCount = 2;
+ createFile(file, 100, replicaCount);
+ DFSTestUtil.waitForReplication(fs, file, replicaCount, 15000);
+
// Disable the heartbeats, so that no corrupted replica
// can be fixed
for (DataNode dn : cluster.getDataNodes()) {
DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);
}
-
+
+ verifyZeroMetrics();
+ verifyAggregatedMetricsTally();
+
// Corrupt first replica of the block
LocatedBlock block = NameNodeAdapter.getBlockLocations(
cluster.getNameNode(), file.toString(), 0, 1).get(0);
@@ -272,12 +368,50 @@ public class TestNameNodeMetrics {
} finally {
cluster.getNamesystem().writeUnlock();
}
+
+ BlockManagerTestUtil.updateState(bm);
+ MetricsRecordBuilder rb = waitForDnMetricValue(NS_METRICS,
+ "CorruptBlocks", 1L, 500);
+ // Verify aggregated blocks metrics
+ assertGauge("LowRedundancyBlocks", 1L, rb);
+ assertGauge("PendingReplicationBlocks", 0L, rb);
+ assertGauge("PendingReconstructionBlocks", 0L, rb);
+ // Verify replicated blocks metrics
+ assertGauge("LowRedundancyReplicatedBlocks", 1L, rb);
+ assertGauge("CorruptReplicatedBlocks", 1L, rb);
+ // Verify striped blocks metrics
+ assertGauge("LowRedundancyECBlockGroups", 0L, rb);
+ assertGauge("CorruptECBlockGroups", 0L, rb);
+
+ verifyAggregatedMetricsTally();
+
+ for (DataNode dn : cluster.getDataNodes()) {
+ DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, false);
+ }
+
+ // Start block reconstruction work
BlockManagerTestUtil.getComputedDatanodeWork(bm);
- MetricsRecordBuilder rb = getMetrics(NS_METRICS);
- assertGauge("CorruptBlocks", 1L, rb);
- assertGauge("PendingReplicationBlocks", 1L, rb);
-
+
+ BlockManagerTestUtil.updateState(bm);
+ DFSTestUtil.waitForReplication(fs, file, replicaCount, 30000);
+ rb = waitForDnMetricValue(NS_METRICS, "CorruptBlocks", 0L, 500);
+
+ // Verify aggregated blocks metrics
+ assertGauge("LowRedundancyBlocks", 0L, rb);
+ assertGauge("CorruptBlocks", 0L, rb);
+ assertGauge("PendingReplicationBlocks", 0L, rb);
+ assertGauge("PendingReconstructionBlocks", 0L, rb);
+ // Verify replicated blocks metrics
+ assertGauge("LowRedundancyReplicatedBlocks", 0L, rb);
+ assertGauge("CorruptReplicatedBlocks", 0L, rb);
+ // Verify striped blocks metrics
+ assertGauge("LowRedundancyECBlockGroups", 0L, rb);
+ assertGauge("CorruptECBlockGroups", 0L, rb);
+
+ verifyAggregatedMetricsTally();
+
fs.delete(file, true);
+ BlockManagerTestUtil.getComputedDatanodeWork(bm);
// During the file deletion, both BlockManager#corruptReplicas and
// BlockManager#pendingReplications will be updated, i.e., the records
// for the blocks of the deleted file will be removed from both
@@ -287,11 +421,97 @@ public class TestNameNodeMetrics {
// BlockManager#updateState is called. And in
// BlockManager#computeDatanodeWork the metric ScheduledReplicationBlocks
// will also be updated.
- rb = waitForDnMetricValue(NS_METRICS, "CorruptBlocks", 0L);
+ BlockManagerTestUtil.updateState(bm);
+ waitForDnMetricValue(NS_METRICS, "CorruptBlocks", 0L, 500);
+ verifyZeroMetrics();
+ verifyAggregatedMetricsTally();
+ }
+
+ @Test (timeout = 90000L)
+ public void testStripedFileCorruptBlocks() throws Exception {
+ final long fileLen = BLOCK_SIZE * 4;
+ final Path ecFile = new Path(ecDir, "ecFile.log");
+ DFSTestUtil.createFile(fs, ecFile, fileLen, (short) 1, 0L);
+ StripedFileTestUtil.waitBlockGroupsReported(fs, ecFile.toString());
+
+ // Disable the heartbeats, so that no corrupted replica
+ // can be fixed
+ for (DataNode dn : cluster.getDataNodes()) {
+ DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);
+ }
+
+ verifyZeroMetrics();
+ verifyAggregatedMetricsTally();
+
+ // Corrupt first replica of the block
+ LocatedBlocks lbs = fs.getClient().getNamenode().getBlockLocations(
+ ecFile.toString(), 0, fileLen);
+ assert lbs.get(0) instanceof LocatedStripedBlock;
+ LocatedStripedBlock bg = (LocatedStripedBlock) (lbs.get(0));
+
+ cluster.getNamesystem().writeLock();
+ try {
+ bm.findAndMarkBlockAsCorrupt(bg.getBlock(), bg.getLocations()[0],
+ "STORAGE_ID", "TEST");
+ } finally {
+ cluster.getNamesystem().writeUnlock();
+ }
+
+ BlockManagerTestUtil.updateState(bm);
+ MetricsRecordBuilder rb = waitForDnMetricValue(NS_METRICS,
+ "CorruptBlocks", 1L, 500);
+ // Verify aggregated blocks metrics
+ assertGauge("LowRedundancyBlocks", 1L, rb);
assertGauge("PendingReplicationBlocks", 0L, rb);
- assertGauge("ScheduledReplicationBlocks", 0L, rb);
+ assertGauge("PendingReconstructionBlocks", 0L, rb);
+ // Verify replica metrics
+ assertGauge("LowRedundancyReplicatedBlocks", 0L, rb);
+ assertGauge("CorruptReplicatedBlocks", 0L, rb);
+ // Verify striped block groups metrics
+ assertGauge("LowRedundancyECBlockGroups", 1L, rb);
+ assertGauge("CorruptECBlockGroups", 1L, rb);
+
+ verifyAggregatedMetricsTally();
+
+ for (DataNode dn : cluster.getDataNodes()) {
+ DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, false);
+ }
+
+ // Start block reconstruction work
+ BlockManagerTestUtil.getComputedDatanodeWork(bm);
+ BlockManagerTestUtil.updateState(bm);
+ StripedFileTestUtil.waitForReconstructionFinished(ecFile, fs, 3);
+
+ rb = waitForDnMetricValue(NS_METRICS, "CorruptBlocks", 0L, 500);
+ assertGauge("CorruptBlocks", 0L, rb);
+ assertGauge("PendingReplicationBlocks", 0L, rb);
+ assertGauge("PendingReconstructionBlocks", 0L, rb);
+ // Verify replicated blocks metrics
+ assertGauge("LowRedundancyReplicatedBlocks", 0L, rb);
+ assertGauge("CorruptReplicatedBlocks", 0L, rb);
+ // Verify striped blocks metrics
+ assertGauge("LowRedundancyECBlockGroups", 0L, rb);
+ assertGauge("CorruptECBlockGroups", 0L, rb);
+
+ verifyAggregatedMetricsTally();
+
+ fs.delete(ecFile, true);
+ BlockManagerTestUtil.getComputedDatanodeWork(bm);
+ // During the file deletion, both BlockManager#corruptReplicas and
+ // BlockManager#pendingReplications will be updated, i.e., the records
+ // for the blocks of the deleted file will be removed from both
+ // corruptReplicas and pendingReplications. The corresponding
+ // metrics (CorruptBlocks and PendingReplicationBlocks) will only be updated
+ // when BlockManager#computeDatanodeWork is run where the
+ // BlockManager#updateState is called. And in
+ // BlockManager#computeDatanodeWork the metric ScheduledReplicationBlocks
+ // will also be updated.
+ BlockManagerTestUtil.updateState(bm);
+ waitForDnMetricValue(NS_METRICS, "CorruptBlocks", 0L, 500);
+ verifyZeroMetrics();
+ verifyAggregatedMetricsTally();
}
-
+
/** Create excess blocks by reducing the replication factor for
* for a file and ensure metrics reflects it
*/
@@ -340,7 +560,7 @@ public class TestNameNodeMetrics {
private void waitForDeletion() throws InterruptedException {
// Wait for more than DATANODE_COUNT replication intervals to ensure all
// the blocks pending deletion are sent for deletion to the datanodes.
- Thread.sleep(DFS_REDUNDANCY_INTERVAL * (DATANODE_COUNT + 1) * 1000);
+ Thread.sleep(DFS_REDUNDANCY_INTERVAL * DATANODE_COUNT * 1000);
}
/**
@@ -358,20 +578,25 @@ public class TestNameNodeMetrics {
* @throws Exception if something went wrong.
*/
private MetricsRecordBuilder waitForDnMetricValue(String source,
- String name,
- long expected)
- throws Exception {
+ String name, long expected) throws Exception {
+ // initial wait
+ waitForDeletion();
+ return waitForDnMetricValue(source, name, expected,
+ DFS_REDUNDANCY_INTERVAL * 500);
+ }
+
+ private MetricsRecordBuilder waitForDnMetricValue(String source,
+ String name, long expected, long sleepInterval) throws Exception {
MetricsRecordBuilder rb;
long gauge;
- //initial wait.
- waitForDeletion();
- //lots of retries are allowed for slow systems; fast ones will still
- //exit early
- int retries = (DATANODE_COUNT + 1) * WAIT_GAUGE_VALUE_RETRIES;
+ // Lots of retries are allowed for slow systems.
+ // Fast ones will still exit early.
+ int retries = DATANODE_COUNT * WAIT_GAUGE_VALUE_RETRIES;
rb = getMetrics(source);
gauge = MetricsAsserts.getLongGauge(name, rb);
while (gauge != expected && (--retries > 0)) {
- Thread.sleep(DFS_REDUNDANCY_INTERVAL * 500);
+ Thread.sleep(sleepInterval);
+ BlockManagerTestUtil.updateState(bm);
rb = getMetrics(source);
gauge = MetricsAsserts.getLongGauge(name, rb);
}
@@ -516,22 +741,22 @@ public class TestNameNodeMetrics {
getMetrics(NS_METRICS));
assertGauge("LastCheckpointTime", lastCkptTime, getMetrics(NS_METRICS));
- assertGauge("LastWrittenTransactionId", 1L, getMetrics(NS_METRICS));
- assertGauge("TransactionsSinceLastCheckpoint", 1L, getMetrics(NS_METRICS));
- assertGauge("TransactionsSinceLastLogRoll", 1L, getMetrics(NS_METRICS));
+ assertGauge("LastWrittenTransactionId", 3L, getMetrics(NS_METRICS));
+ assertGauge("TransactionsSinceLastCheckpoint", 3L, getMetrics(NS_METRICS));
+ assertGauge("TransactionsSinceLastLogRoll", 3L, getMetrics(NS_METRICS));
fs.mkdirs(new Path(TEST_ROOT_DIR_PATH, "/tmp"));
assertGauge("LastCheckpointTime", lastCkptTime, getMetrics(NS_METRICS));
- assertGauge("LastWrittenTransactionId", 2L, getMetrics(NS_METRICS));
- assertGauge("TransactionsSinceLastCheckpoint", 2L, getMetrics(NS_METRICS));
- assertGauge("TransactionsSinceLastLogRoll", 2L, getMetrics(NS_METRICS));
+ assertGauge("LastWrittenTransactionId", 4L, getMetrics(NS_METRICS));
+ assertGauge("TransactionsSinceLastCheckpoint", 4L, getMetrics(NS_METRICS));
+ assertGauge("TransactionsSinceLastLogRoll", 4L, getMetrics(NS_METRICS));
cluster.getNameNodeRpc().rollEditLog();
assertGauge("LastCheckpointTime", lastCkptTime, getMetrics(NS_METRICS));
- assertGauge("LastWrittenTransactionId", 4L, getMetrics(NS_METRICS));
- assertGauge("TransactionsSinceLastCheckpoint", 4L, getMetrics(NS_METRICS));
+ assertGauge("LastWrittenTransactionId", 6L, getMetrics(NS_METRICS));
+ assertGauge("TransactionsSinceLastCheckpoint", 6L, getMetrics(NS_METRICS));
assertGauge("TransactionsSinceLastLogRoll", 1L, getMetrics(NS_METRICS));
cluster.getNameNodeRpc().setSafeMode(SafeModeAction.SAFEMODE_ENTER, false);
@@ -541,7 +766,7 @@ public class TestNameNodeMetrics {
long newLastCkptTime = MetricsAsserts.getLongGauge("LastCheckpointTime",
getMetrics(NS_METRICS));
assertTrue(lastCkptTime < newLastCkptTime);
- assertGauge("LastWrittenTransactionId", 6L, getMetrics(NS_METRICS));
+ assertGauge("LastWrittenTransactionId", 8L, getMetrics(NS_METRICS));
assertGauge("TransactionsSinceLastCheckpoint", 1L, getMetrics(NS_METRICS));
assertGauge("TransactionsSinceLastLogRoll", 1L, getMetrics(NS_METRICS));
}
@@ -554,10 +779,10 @@ public class TestNameNodeMetrics {
public void testSyncAndBlockReportMetric() throws Exception {
MetricsRecordBuilder rb = getMetrics(NN_METRICS);
// We have one sync when the cluster starts up, just opening the journal
- assertCounter("SyncsNumOps", 1L, rb);
+ assertCounter("SyncsNumOps", 3L, rb);
// Each datanode reports in when the cluster comes up
assertCounter("BlockReportNumOps",
- (long)DATANODE_COUNT * cluster.getStoragesPerDatanode(), rb);
+ (long) DATANODE_COUNT * cluster.getStoragesPerDatanode(), rb);
// Sleep for an interval+slop to let the percentiles rollover
Thread.sleep((PERCENTILES_INTERVAL+1)*1000);
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org