You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2019/11/01 16:47:06 UTC
[hadoop] branch trunk updated: HDFS-14768. EC : Busy DN replica
should be consider in live replica check. Contributed by guojh.
This is an automated email from the ASF dual-hosted git repository.
surendralilhore pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 02009c3 HDFS-14768. EC : Busy DN replica should be consider in live replica check. Contributed by guojh.
02009c3 is described below
commit 02009c3bb762393540cdf92cfd9c840807272903
Author: Surendra Singh Lilhore <su...@apache.org>
AuthorDate: Fri Nov 1 20:34:09 2019 +0530
HDFS-14768. EC : Busy DN replica should be consider in live replica check. Contributed by guojh.
---
.../hdfs/server/blockmanagement/BlockManager.java | 33 +++++++----
.../server/blockmanagement/DatanodeDescriptor.java | 3 +-
.../server/blockmanagement/ErasureCodingWork.java | 12 +++-
.../hadoop/hdfs/TestDecommissionWithStriped.java | 68 +++++++++++++++++++++-
.../server/blockmanagement/TestBlockManager.java | 17 +++++-
5 files changed, 113 insertions(+), 20 deletions(-)
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 4c75733..3b76eec 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -854,7 +854,7 @@ public class BlockManager implements BlockStatsMXBean {
}
// source node returned is not used
chooseSourceDatanodes(blockInfo, containingNodes,
- containingLiveReplicasNodes, numReplicas,
+ containingLiveReplicasNodes, numReplicas, new ArrayList<Byte>(),
new ArrayList<Byte>(), LowRedundancyBlocks.LEVEL);
// containingLiveReplicasNodes can include READ_ONLY_SHARED replicas which are
@@ -2024,9 +2024,10 @@ public class BlockManager implements BlockStatsMXBean {
List<DatanodeStorageInfo> liveReplicaNodes = new ArrayList<>();
NumberReplicas numReplicas = new NumberReplicas();
List<Byte> liveBlockIndices = new ArrayList<>();
+ List<Byte> liveBusyBlockIndices = new ArrayList<>();
final DatanodeDescriptor[] srcNodes = chooseSourceDatanodes(block,
containingNodes, liveReplicaNodes, numReplicas,
- liveBlockIndices, priority);
+ liveBlockIndices, liveBusyBlockIndices, priority);
short requiredRedundancy = getExpectedLiveRedundancyNum(block,
numReplicas);
if(srcNodes == null || srcNodes.length == 0) {
@@ -2079,9 +2080,13 @@ public class BlockManager implements BlockStatsMXBean {
for (int i = 0 ; i < liveBlockIndices.size(); i++) {
indices[i] = liveBlockIndices.get(i);
}
+ byte[] busyIndices = new byte[liveBusyBlockIndices.size()];
+ for (int i = 0; i < liveBusyBlockIndices.size(); i++) {
+ busyIndices[i] = liveBusyBlockIndices.get(i);
+ }
return new ErasureCodingWork(getBlockPoolId(), block, bc, srcNodes,
containingNodes, liveReplicaNodes, additionalReplRequired,
- priority, indices);
+ priority, indices, busyIndices);
} else {
return new ReplicationWork(block, bc, srcNodes,
containingNodes, liveReplicaNodes, additionalReplRequired,
@@ -2293,8 +2298,8 @@ public class BlockManager implements BlockStatsMXBean {
DatanodeDescriptor[] chooseSourceDatanodes(BlockInfo block,
List<DatanodeDescriptor> containingNodes,
List<DatanodeStorageInfo> nodesContainingLiveReplicas,
- NumberReplicas numReplicas,
- List<Byte> liveBlockIndices, int priority) {
+ NumberReplicas numReplicas, List<Byte> liveBlockIndices,
+ List<Byte> liveBusyBlockIndices, int priority) {
containingNodes.clear();
nodesContainingLiveReplicas.clear();
List<DatanodeDescriptor> srcNodes = new ArrayList<>();
@@ -2347,12 +2352,6 @@ public class BlockManager implements BlockStatsMXBean {
continue;
}
- if (priority != LowRedundancyBlocks.QUEUE_HIGHEST_PRIORITY
- && (!node.isDecommissionInProgress() && !node.isEnteringMaintenance())
- && node.getNumberOfBlocksToBeReplicated() >= maxReplicationStreams) {
- continue; // already reached replication limit
- }
-
// for EC here need to make sure the numReplicas replicates state correct
// because in the scheduleReconstruction it need the numReplicas to check
// whether need to reconstruct the ec internal block
@@ -2364,7 +2363,19 @@ public class BlockManager implements BlockStatsMXBean {
liveBitSet, decommissioningBitSet, blockIndex);
}
+ if (priority != LowRedundancyBlocks.QUEUE_HIGHEST_PRIORITY
+ && (!node.isDecommissionInProgress() && !node.isEnteringMaintenance())
+ && node.getNumberOfBlocksToBeReplicated() >= maxReplicationStreams) {
+ if (isStriped && state == StoredReplicaState.LIVE) {
+ liveBusyBlockIndices.add(blockIndex);
+ }
+ continue; // already reached replication limit
+ }
+
if (node.getNumberOfBlocksToBeReplicated() >= replicationStreamsHardLimit) {
+ if (isStriped && state == StoredReplicaState.LIVE) {
+ liveBusyBlockIndices.add(blockIndex);
+ }
continue;
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
index 9fa1de8..9035fd3 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
@@ -634,7 +634,8 @@ public class DatanodeDescriptor extends DatanodeInfo {
return new BlockIterator(startBlock, getStorageInfos());
}
- void incrementPendingReplicationWithoutTargets() {
+ @VisibleForTesting
+ public void incrementPendingReplicationWithoutTargets() {
pendingReplicationWithoutTargets++;
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java
index 5d06293..8de3f38 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java
@@ -31,6 +31,7 @@ import java.util.Set;
class ErasureCodingWork extends BlockReconstructionWork {
private final byte[] liveBlockIndicies;
+ private final byte[] liveBusyBlockIndicies;
private final String blockPoolId;
public ErasureCodingWork(String blockPoolId, BlockInfo block,
@@ -38,12 +39,13 @@ class ErasureCodingWork extends BlockReconstructionWork {
DatanodeDescriptor[] srcNodes,
List<DatanodeDescriptor> containingNodes,
List<DatanodeStorageInfo> liveReplicaStorages,
- int additionalReplRequired,
- int priority, byte[] liveBlockIndicies) {
+ int additionalReplRequired, int priority,
+ byte[] liveBlockIndicies, byte[] liveBusyBlockIndicies) {
super(block, bc, srcNodes, containingNodes,
liveReplicaStorages, additionalReplRequired, priority);
this.blockPoolId = blockPoolId;
this.liveBlockIndicies = liveBlockIndicies;
+ this.liveBusyBlockIndicies = liveBusyBlockIndicies;
LOG.debug("Creating an ErasureCodingWork to {} reconstruct ",
block);
}
@@ -70,13 +72,17 @@ class ErasureCodingWork extends BlockReconstructionWork {
*/
private boolean hasAllInternalBlocks() {
final BlockInfoStriped block = (BlockInfoStriped) getBlock();
- if (getSrcNodes().length < block.getRealTotalBlockNum()) {
+ if (liveBlockIndicies.length
+ + liveBusyBlockIndicies.length < block.getRealTotalBlockNum()) {
return false;
}
BitSet bitSet = new BitSet(block.getTotalBlockNum());
for (byte index : liveBlockIndicies) {
bitSet.set(index);
}
+ for (byte busyIndex: liveBusyBlockIndicies) {
+ bitSet.set(busyIndex);
+ }
for (int i = 0; i < block.getRealDataBlockNum(); i++) {
if (!bitSet.get(i)) {
return false;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommissionWithStriped.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommissionWithStriped.java
index 0031878..f4a99e9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommissionWithStriped.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommissionWithStriped.java
@@ -40,14 +40,16 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
+import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
@@ -55,6 +57,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.INodeFile;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.security.token.Token;
@@ -81,6 +84,9 @@ public class TestDecommissionWithStriped {
// replication interval
private static final int NAMENODE_REPLICATION_INTERVAL = 1;
+ private int replicationStreamsHardLimit =
+ DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_DEFAULT;
+
private Path decommissionDir;
private Path hostsFile;
private Path excludeFile;
@@ -273,7 +279,6 @@ public class TestDecommissionWithStriped {
fsn.getNumDecomLiveDataNodes());
// Ensure decommissioned datanode is not automatically shutdown
- DFSClient client = getDfsClient(cluster.getNameNode(0), conf);
assertEquals("All datanodes must be alive", numDNs,
client.datanodeReport(DatanodeReportType.LIVE).length);
@@ -284,6 +289,65 @@ public class TestDecommissionWithStriped {
}
/**
+ * DN decommission shouldn't reconstruction busy DN block.
+ * @throws Exception
+ */
+ @Test(timeout = 120000)
+ public void testDecommissionWithBusyNode() throws Exception {
+ byte busyDNIndex = 1;
+ byte decommisionDNIndex = 0;
+ //1. create EC file
+ final Path ecFile = new Path(ecDir, "testDecommissionWithBusyNode");
+ int writeBytes = cellSize * dataBlocks;
+ writeStripedFile(dfs, ecFile, writeBytes);
+ Assert.assertEquals(0, bm.numOfUnderReplicatedBlocks());
+ FileChecksum fileChecksum1 = dfs.getFileChecksum(ecFile, writeBytes);
+
+ //2. make once DN busy
+ final INodeFile fileNode = cluster.getNamesystem().getFSDirectory()
+ .getINode4Write(ecFile.toString()).asFile();
+ BlockInfo firstBlock = fileNode.getBlocks()[0];
+ DatanodeStorageInfo[] dnStorageInfos = bm.getStorages(firstBlock);
+ DatanodeDescriptor busyNode =
+ dnStorageInfos[busyDNIndex].getDatanodeDescriptor();
+ for (int j = 0; j < replicationStreamsHardLimit; j++) {
+ busyNode.incrementPendingReplicationWithoutTargets();
+ }
+
+ //3. decomission one node
+ List<DatanodeInfo> decommisionNodes = new ArrayList<>();
+ decommisionNodes.add(
+ dnStorageInfos[decommisionDNIndex].getDatanodeDescriptor());
+ decommissionNode(0, decommisionNodes, AdminStates.DECOMMISSIONED);
+ assertEquals(decommisionNodes.size(), fsn.getNumDecomLiveDataNodes());
+
+ //4. wait for decommission block to replicate
+ Thread.sleep(3000);
+ DatanodeStorageInfo[] newDnStorageInfos = bm.getStorages(firstBlock);
+ Assert.assertEquals("Busy DN shouldn't be reconstructed",
+ dnStorageInfos[busyDNIndex].getStorageID(),
+ newDnStorageInfos[busyDNIndex].getStorageID());
+
+ //5. check decommission DN block index, it should be reconstructed again
+ LocatedBlocks lbs = cluster.getNameNodeRpc().getBlockLocations(
+ ecFile.toString(), 0, writeBytes);
+ LocatedStripedBlock bg = (LocatedStripedBlock) (lbs.get(0));
+ int decommissionBlockIndexCount = 0;
+ for (byte index : bg.getBlockIndices()) {
+ if (index == decommisionDNIndex) {
+ decommissionBlockIndexCount++;
+ }
+ }
+
+ Assert.assertEquals("Decommission DN block should be reconstructed", 2,
+ decommissionBlockIndexCount);
+
+ FileChecksum fileChecksum2 = dfs.getFileChecksum(ecFile, writeBytes);
+ Assert.assertTrue("Checksum mismatches!",
+ fileChecksum1.equals(fileChecksum2));
+ }
+
+ /**
* Tests to verify that the file checksum should be able to compute after the
* decommission operation.
*
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
index ba88afe..237cacc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
@@ -660,6 +660,7 @@ public class TestBlockManager {
liveNodes,
new NumberReplicas(),
new ArrayList<Byte>(),
+ new ArrayList<Byte>(),
LowRedundancyBlocks.QUEUE_HIGHEST_PRIORITY)[0]);
assertEquals("Does not choose a source node for a less-than-highest-priority"
@@ -671,6 +672,7 @@ public class TestBlockManager {
liveNodes,
new NumberReplicas(),
new ArrayList<Byte>(),
+ new ArrayList<Byte>(),
LowRedundancyBlocks.QUEUE_VERY_LOW_REDUNDANCY).length);
// Increase the replication count to test replication count > hard limit
@@ -685,6 +687,7 @@ public class TestBlockManager {
liveNodes,
new NumberReplicas(),
new ArrayList<Byte>(),
+ new ArrayList<Byte>(),
LowRedundancyBlocks.QUEUE_HIGHEST_PRIORITY).length);
}
@@ -730,13 +733,15 @@ public class TestBlockManager {
List<DatanodeStorageInfo> liveNodes = new LinkedList<DatanodeStorageInfo>();
NumberReplicas numReplicas = new NumberReplicas();
List<Byte> liveBlockIndices = new ArrayList<>();
+ List<Byte> liveBusyBlockIndices = new ArrayList<>();
bm.chooseSourceDatanodes(
aBlockInfoStriped,
cntNodes,
liveNodes,
numReplicas, liveBlockIndices,
- LowRedundancyBlocks.QUEUE_HIGHEST_PRIORITY);
+ liveBusyBlockIndices,
+ LowRedundancyBlocks.QUEUE_VERY_LOW_REDUNDANCY);
assertEquals("Choose the source node for reconstruction with one node reach"
+ " the MAX maxReplicationStreams, the numReplicas still return the"
@@ -791,12 +796,14 @@ public class TestBlockManager {
new LinkedList<DatanodeStorageInfo>();
NumberReplicas numReplicas = new NumberReplicas();
List<Byte> liveBlockIndices = new ArrayList<>();
+ List<Byte> liveBusyBlockIndices = new ArrayList<>();
bm.chooseSourceDatanodes(
aBlockInfoStriped,
containingNodes,
nodesContainingLiveReplicas,
numReplicas, liveBlockIndices,
+ liveBusyBlockIndices,
LowRedundancyBlocks.QUEUE_HIGHEST_PRIORITY);
assertEquals("There are 5 live replicas in " +
"[ds2, ds3, ds4, ds5, ds6] datanodes ",
@@ -828,7 +835,9 @@ public class TestBlockManager {
bm.getStoredBlock(aBlock),
cntNodes,
liveNodes,
- new NumberReplicas(), new LinkedList<Byte>(),
+ new NumberReplicas(),
+ new LinkedList<Byte>(),
+ new ArrayList<Byte>(),
LowRedundancyBlocks.QUEUE_LOW_REDUNDANCY)[0]);
@@ -842,7 +851,9 @@ public class TestBlockManager {
bm.getStoredBlock(aBlock),
cntNodes,
liveNodes,
- new NumberReplicas(), new LinkedList<Byte>(),
+ new NumberReplicas(),
+ new LinkedList<Byte>(),
+ new ArrayList<Byte>(),
LowRedundancyBlocks.QUEUE_LOW_REDUNDANCY).length);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org