You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2020/02/13 11:31:56 UTC
[hadoop] branch trunk updated: HDFS-15086. Block scheduled counter
never get decremet if the block got deleted before replication. Contributed
by hemanthboyina.
This is an automated email from the ASF dual-hosted git repository.
surendralilhore pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new a98352c HDFS-15086. Block scheduled counter never get decremet if the block got deleted before replication. Contributed by hemanthboyina.
a98352c is described below
commit a98352ced18e51003b443e1a652d19ec00b2f2d2
Author: Surendra Singh Lilhore <su...@apache.org>
AuthorDate: Thu Feb 13 16:30:19 2020 +0530
HDFS-15086. Block scheduled counter never get decremet if the block got deleted before replication. Contributed by hemanthboyina.
---
.../hdfs/server/blockmanagement/BlockManager.java | 18 ++++--
.../server/blockmanagement/DatanodeManager.java | 20 +++++-
.../PendingReconstructionBlocks.java | 42 ++++++++----
.../hadoop/hdfs/TestBlocksScheduledCounter.java | 75 +++++++++++++++++++++-
.../blockmanagement/TestPendingReconstruction.java | 26 ++++----
5 files changed, 146 insertions(+), 35 deletions(-)
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index c35087a..45d3b79 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -83,6 +83,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.CorruptReplicasMap.Reason;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo.AddBlockResult;
import org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas.StoredReplicaState;
import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo;
+import org.apache.hadoop.hdfs.server.blockmanagement.PendingReconstructionBlocks.PendingBlockInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
@@ -1112,15 +1113,15 @@ public class BlockManager implements BlockStatsMXBean {
DatanodeStorageInfo[] expectedStorages =
blk.getUnderConstructionFeature().getExpectedStorageLocations();
if (expectedStorages.length - blk.numNodes() > 0) {
- ArrayList<DatanodeDescriptor> pendingNodes = new ArrayList<>();
+ ArrayList<DatanodeStorageInfo> pendingNodes = new ArrayList<>();
for (DatanodeStorageInfo storage : expectedStorages) {
DatanodeDescriptor dnd = storage.getDatanodeDescriptor();
if (blk.findStorageInfo(dnd) == null) {
- pendingNodes.add(dnd);
+ pendingNodes.add(storage);
}
}
pendingReconstruction.increment(blk,
- pendingNodes.toArray(new DatanodeDescriptor[pendingNodes.size()]));
+ pendingNodes.toArray(new DatanodeStorageInfo[pendingNodes.size()]));
}
}
}
@@ -2170,8 +2171,7 @@ public class BlockManager implements BlockStatsMXBean {
// Move the block-replication into a "pending" state.
// The reason we use 'pending' is so we can retry
// reconstructions that fail after an appropriate amount of time.
- pendingReconstruction.increment(block,
- DatanodeStorageInfo.toDatanodeDescriptors(targets));
+ pendingReconstruction.increment(block, targets);
blockLog.debug("BLOCK* block {} is moved from neededReconstruction to "
+ "pendingReconstruction", block);
@@ -4084,7 +4084,7 @@ public class BlockManager implements BlockStatsMXBean {
BlockInfo storedBlock = getStoredBlock(block);
if (storedBlock != null &&
block.getGenerationStamp() == storedBlock.getGenerationStamp()) {
- if (pendingReconstruction.decrement(storedBlock, node)) {
+ if (pendingReconstruction.decrement(storedBlock, storageInfo)) {
NameNode.getNameNodeMetrics().incSuccessfulReReplications();
}
}
@@ -4499,7 +4499,11 @@ public class BlockManager implements BlockStatsMXBean {
addToInvalidates(block);
removeBlockFromMap(block);
// Remove the block from pendingReconstruction and neededReconstruction
- pendingReconstruction.remove(block);
+ PendingBlockInfo remove = pendingReconstruction.remove(block);
+ if (remove != null) {
+ DatanodeStorageInfo.decrementBlocksScheduled(remove.getTargets()
+ .toArray(new DatanodeStorageInfo[remove.getTargets().size()]));
+ }
neededReconstruction.remove(block, LowRedundancyBlocks.LEVEL);
postponedMisreplicatedBlocks.remove(block);
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index 8adb03d..6ec9d18 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -1713,8 +1713,24 @@ public class DatanodeManager {
List<BlockTargetPair> pendingList = nodeinfo.getReplicationCommand(
numReplicationTasks);
if (pendingList != null && !pendingList.isEmpty()) {
- cmds.add(new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blockPoolId,
- pendingList));
+ // If the block is deleted, the block size will become
+ // BlockCommand.NO_ACK (LONG.MAX_VALUE) . This kind of block we don't
+ // need
+ // to send for replication or reconstruction
+ Iterator<BlockTargetPair> iterator = pendingList.iterator();
+ while (iterator.hasNext()) {
+ BlockTargetPair cmd = iterator.next();
+ if (cmd.block != null
+ && cmd.block.getNumBytes() == BlockCommand.NO_ACK) {
+ // block deleted
+ DatanodeStorageInfo.decrementBlocksScheduled(cmd.targets);
+ iterator.remove();
+ }
+ }
+ if (!pendingList.isEmpty()) {
+ cmds.add(new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blockPoolId,
+ pendingList));
+ }
}
// check pending erasure coding tasks
List<BlockECReconstructionInfo> pendingECList = nodeinfo
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReconstructionBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReconstructionBlocks.java
index fb93d82..6e1af57 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReconstructionBlocks.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReconstructionBlocks.java
@@ -81,7 +81,7 @@ class PendingReconstructionBlocks {
* @param block The corresponding block
* @param targets The DataNodes where replicas of the block should be placed
*/
- void increment(BlockInfo block, DatanodeDescriptor... targets) {
+ void increment(BlockInfo block, DatanodeStorageInfo... targets) {
synchronized (pendingReconstructions) {
PendingBlockInfo found = pendingReconstructions.get(block);
if (found == null) {
@@ -101,7 +101,7 @@ class PendingReconstructionBlocks {
* @param dn The DataNode that finishes the reconstruction
* @return true if the block is decremented to 0 and got removed.
*/
- boolean decrement(BlockInfo block, DatanodeDescriptor dn) {
+ boolean decrement(BlockInfo block, DatanodeStorageInfo dn) {
boolean removed = false;
synchronized (pendingReconstructions) {
PendingBlockInfo found = pendingReconstructions.get(block);
@@ -124,9 +124,9 @@ class PendingReconstructionBlocks {
* The given block whose pending reconstruction requests need to be
* removed
*/
- void remove(BlockInfo block) {
+ PendingBlockInfo remove(BlockInfo block) {
synchronized (pendingReconstructions) {
- pendingReconstructions.remove(block);
+ return pendingReconstructions.remove(block);
}
}
@@ -200,11 +200,11 @@ class PendingReconstructionBlocks {
*/
static class PendingBlockInfo {
private long timeStamp;
- private final List<DatanodeDescriptor> targets;
+ private final List<DatanodeStorageInfo> targets;
- PendingBlockInfo(DatanodeDescriptor[] targets) {
+ PendingBlockInfo(DatanodeStorageInfo[] targets) {
this.timeStamp = monotonicNow();
- this.targets = targets == null ? new ArrayList<DatanodeDescriptor>()
+ this.targets = targets == null ? new ArrayList<DatanodeStorageInfo>()
: new ArrayList<>(Arrays.asList(targets));
}
@@ -216,9 +216,9 @@ class PendingReconstructionBlocks {
timeStamp = monotonicNow();
}
- void incrementReplicas(DatanodeDescriptor... newTargets) {
+ void incrementReplicas(DatanodeStorageInfo... newTargets) {
if (newTargets != null) {
- for (DatanodeDescriptor newTarget : newTargets) {
+ for (DatanodeStorageInfo newTarget : newTargets) {
if (!targets.contains(newTarget)) {
targets.add(newTarget);
}
@@ -226,13 +226,23 @@ class PendingReconstructionBlocks {
}
}
- void decrementReplicas(DatanodeDescriptor dn) {
- targets.remove(dn);
+ void decrementReplicas(DatanodeStorageInfo dn) {
+ Iterator<DatanodeStorageInfo> iterator = targets.iterator();
+ while (iterator.hasNext()) {
+ DatanodeStorageInfo next = iterator.next();
+ if (next.getDatanodeDescriptor() == dn.getDatanodeDescriptor()) {
+ iterator.remove();
+ }
+ }
}
int getNumReplicas() {
return targets.size();
}
+
+ List<DatanodeStorageInfo> getTargets() {
+ return targets;
+ }
}
/*
@@ -318,4 +328,14 @@ class PendingReconstructionBlocks {
}
}
}
+
+ List<DatanodeStorageInfo> getTargets(BlockInfo block) {
+ synchronized (pendingReconstructions) {
+ PendingBlockInfo found = pendingReconstructions.get(block);
+ if (found != null) {
+ return found.targets;
+ }
+ }
+ return null;
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlocksScheduledCounter.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlocksScheduledCounter.java
index 1894278..95f4e2c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlocksScheduledCounter.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlocksScheduledCounter.java
@@ -22,11 +22,19 @@ import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.util.ArrayList;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.junit.After;
import org.junit.Test;
@@ -129,4 +137,69 @@ public class TestBlocksScheduledCounter {
0, descriptor.getBlocksScheduled());
}
}
-}
+
+ /**
+ * Test if Block Scheduled counter decrement if scheduled blocks file is.
+ * deleted
+ * @throws Exception
+ */
+ @Test
+ public void testScheduledBlocksCounterDecrementOnDeletedBlock()
+ throws Exception {
+ final Configuration conf = new HdfsConfiguration();
+ conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1024);
+ conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
+ conf.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, 1);
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(5).build();
+ cluster.waitActive();
+ BlockManager bm = cluster.getNamesystem().getBlockManager();
+ try {
+ DistributedFileSystem dfs = cluster.getFileSystem();
+ // 1. create a file
+ Path filePath = new Path("/tmp.txt");
+ DFSTestUtil.createFile(dfs, filePath, 1024, (short) 3, 0L);
+
+ // 2. disable the heartbeats
+ for (DataNode dn : cluster.getDataNodes()) {
+ DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);
+ }
+
+ DatanodeManager datanodeManager =
+ cluster.getNamesystem().getBlockManager().getDatanodeManager();
+ ArrayList<DatanodeDescriptor> dnList =
+ new ArrayList<DatanodeDescriptor>();
+ datanodeManager.fetchDatanodes(dnList, dnList, false);
+
+ // 3. mark a couple of blocks as corrupt
+ LocatedBlock block = NameNodeAdapter
+ .getBlockLocations(cluster.getNameNode(), filePath.toString(), 0, 1)
+ .get(0);
+ DatanodeInfo[] locs = block.getLocations();
+ cluster.getNamesystem().writeLock();
+ try {
+ bm.findAndMarkBlockAsCorrupt(block.getBlock(), locs[0], "STORAGE_ID",
+ "TEST");
+ bm.findAndMarkBlockAsCorrupt(block.getBlock(), locs[1], "STORAGE_ID",
+ "TEST");
+ BlockManagerTestUtil.computeAllPendingWork(bm);
+ BlockManagerTestUtil.updateState(bm);
+ assertEquals(1L, bm.getPendingReconstructionBlocksCount());
+ } finally {
+ cluster.getNamesystem().writeUnlock();
+ }
+
+ // 4. delete the file
+ dfs.delete(filePath, true);
+ int blocksScheduled = 0;
+ for (DatanodeDescriptor descriptor : dnList) {
+ if (descriptor.getBlocksScheduled() != 0) {
+ blocksScheduled += descriptor.getBlocksScheduled();
+ }
+ }
+ assertEquals(0, blocksScheduled);
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReconstruction.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReconstruction.java
index aa57a4a..8fcb0fe 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReconstruction.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReconstruction.java
@@ -89,8 +89,7 @@ public class TestPendingReconstruction {
BlockInfo block = genBlockInfo(i, i, 0);
DatanodeStorageInfo[] targets = new DatanodeStorageInfo[i];
System.arraycopy(storages, 0, targets, 0, i);
- pendingReconstructions.increment(block,
- DatanodeStorageInfo.toDatanodeDescriptors(targets));
+ pendingReconstructions.increment(block, targets);
}
assertEquals("Size of pendingReconstruction ",
10, pendingReconstructions.size());
@@ -100,25 +99,24 @@ public class TestPendingReconstruction {
// remove one item
//
BlockInfo blk = genBlockInfo(8, 8, 0);
- pendingReconstructions.decrement(blk, storages[7].getDatanodeDescriptor()); // removes one replica
+ pendingReconstructions.decrement(blk, storages[7]); // removes one replica
assertEquals("pendingReconstructions.getNumReplicas ",
7, pendingReconstructions.getNumReplicas(blk));
//
// insert the same item twice should be counted as once
//
- pendingReconstructions.increment(blk, storages[0].getDatanodeDescriptor());
+ pendingReconstructions.increment(blk, storages[0]);
assertEquals("pendingReconstructions.getNumReplicas ",
7, pendingReconstructions.getNumReplicas(blk));
for (int i = 0; i < 7; i++) {
// removes all replicas
- pendingReconstructions.decrement(blk, storages[i].getDatanodeDescriptor());
+ pendingReconstructions.decrement(blk, storages[i]);
}
assertTrue(pendingReconstructions.size() == 9);
pendingReconstructions.increment(blk,
- DatanodeStorageInfo.toDatanodeDescriptors(
- DFSTestUtil.createDatanodeStorageInfos(8)));
+ DFSTestUtil.createDatanodeStorageInfos(8));
assertTrue(pendingReconstructions.size() == 10);
//
@@ -148,8 +146,7 @@ public class TestPendingReconstruction {
for (int i = 10; i < 15; i++) {
BlockInfo block = genBlockInfo(i, i, 0);
pendingReconstructions.increment(block,
- DatanodeStorageInfo.toDatanodeDescriptors(
- DFSTestUtil.createDatanodeStorageInfos(i)));
+ DFSTestUtil.createDatanodeStorageInfos(i));
}
assertEquals(15, pendingReconstructions.size());
assertEquals(0L, pendingReconstructions.getNumTimedOuts());
@@ -217,8 +214,7 @@ public class TestPendingReconstruction {
blockInfo = new BlockInfoContiguous(block, (short) 3);
pendingReconstruction.increment(blockInfo,
- DatanodeStorageInfo.toDatanodeDescriptors(
- DFSTestUtil.createDatanodeStorageInfos(1)));
+ DFSTestUtil.createDatanodeStorageInfos(1));
BlockCollection bc = Mockito.mock(BlockCollection.class);
// Place into blocksmap with GenerationStamp = 1
blockInfo.setGenerationStamp(1);
@@ -234,8 +230,7 @@ public class TestPendingReconstruction {
block = new Block(2, 2, 0);
blockInfo = new BlockInfoContiguous(block, (short) 3);
pendingReconstruction.increment(blockInfo,
- DatanodeStorageInfo.toDatanodeDescriptors(
- DFSTestUtil.createDatanodeStorageInfos(1)));
+ DFSTestUtil.createDatanodeStorageInfos(1));
// verify 2 blocks in pendingReconstructions
assertEquals("Size of pendingReconstructions ", 2,
@@ -281,7 +276,8 @@ public class TestPendingReconstruction {
getDatanodes().iterator().next() };
// Add a stored block to the pendingReconstruction.
- pendingReconstruction.increment(storedBlock, desc);
+ pendingReconstruction.increment(blockInfo,
+ DFSTestUtil.createDatanodeStorageInfos(1));
assertEquals("Size of pendingReconstructions ", 1,
pendingReconstruction.size());
@@ -310,6 +306,8 @@ public class TestPendingReconstruction {
fsn.writeUnlock();
}
+ GenericTestUtils.waitFor(() -> pendingReconstruction.size() == 0, 500,
+ 10000);
// The pending queue should be empty.
assertEquals("Size of pendingReconstructions ", 0,
pendingReconstruction.size());
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org