You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by he...@apache.org on 2021/09/17 14:19:58 UTC
[hadoop] 05/06: HDFS-15113. Missing IBR when NameNode restart if
open processCommand async feature. Contributed by Xiaoqiao He.
This is an automated email from the ASF dual-hosted git repository.
hexiaoqiao pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git
commit 50b0f0dc42d5cdcfbd381527ff774f20852888ba
Author: He Xiaoqiao <he...@apache.org>
AuthorDate: Fri Mar 13 18:54:36 2020 -0700
HDFS-15113. Missing IBR when NameNode restart if open processCommand async feature. Contributed by Xiaoqiao He.
Signed-off-by: Wei-Chiu Chuang <we...@apache.org>
Reviewed-by: Brahma Reddy Battula <br...@apache.org>
Reviewed-by: Inigo Goiri <in...@apache.org>
(cherry picked from commit e9955bb8ff44396eb478f709307d647ca884de99)
---
.../hdfs/server/datanode/BPServiceActor.java | 9 +-
.../server/datanode/DataNodeFaultInjector.java | 5 +
.../hdfs/server/datanode/TestBPOfferService.java | 112 +++++++++++++++++++++
3 files changed, 123 insertions(+), 3 deletions(-)
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
index e8c64d2..ea91402 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
@@ -891,14 +891,17 @@ class BPServiceActor implements Runnable {
// re-retrieve namespace info to make sure that, if the NN
// was restarted, we still match its version (HDFS-2120)
NamespaceInfo nsInfo = retrieveNamespaceInfo();
- // and re-register
- register(nsInfo);
- scheduler.scheduleHeartbeat();
// HDFS-9917,Standby NN IBR can be very huge if standby namenode is down
// for sometime.
if (state == HAServiceState.STANDBY || state == HAServiceState.OBSERVER) {
ibrManager.clearIBRs();
}
+ // HDFS-15113, register and trigger FBR after clean IBR to avoid missing
+ // some blocks report to Standby util next FBR.
+ // and re-register
+ register(nsInfo);
+ scheduler.scheduleHeartbeat();
+ DataNodeFaultInjector.get().blockUtilSendFullBlockReport();
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
index 821717b..4d3d220 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
@@ -134,4 +134,9 @@ public class DataNodeFaultInjector {
* into an erasure coding reconstruction.
*/
public void badDecoding(ByteBuffer[] outputs) {}
+
+ /**
+ * Used as a hook to inject intercept when re-register.
+ */
+ public void blockUtilSendFullBlockReport() {}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
index 98af2fa..79ad5d1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
@@ -21,6 +21,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
@@ -57,6 +58,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
+import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset.SimulatedStorage;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
@@ -107,6 +109,8 @@ public class TestBPOfferService {
private long firstLeaseId = 0;
private long secondLeaseId = 0;
private long nextFullBlockReportLeaseId = 1L;
+ private int fullBlockReportCount = 0;
+ private int incrBlockReportCount = 0;
static {
GenericTestUtils.setLogLevel(DataNode.LOG, Level.ALL);
@@ -229,6 +233,14 @@ public class TestBPOfferService {
}
}
+ private void setBlockReportCount(int count) {
+ fullBlockReportCount = count;
+ }
+
+ private void setIncreaseBlockReportCount(int count) {
+ incrBlockReportCount += count;
+ }
+
/**
* Test that the BPOS can register to talk to two different NNs,
* sends block reports to both, etc.
@@ -267,6 +279,76 @@ public class TestBPOfferService {
}
}
+ /**
+ * HDFS-15113: Test and verify missing block when re-register.
+ */
+ @Test
+ public void testMissBlocksWhenReregister() throws Exception {
+ BPOfferService bpos = setupBPOSForNNs(mockNN1, mockNN2);
+ bpos.start();
+ try {
+ waitForBothActors(bpos);
+ waitForInitialization(bpos);
+
+ DataNodeFaultInjector.set(new DataNodeFaultInjector() {
+ public void blockUtilSendFullBlockReport() {
+ try {
+ Thread.sleep(200);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ });
+
+ countBlockReportItems(FAKE_BLOCK, mockNN1);
+ int totalTestBlocks = 4000;
+ Thread addNewBlockThread = new Thread(() -> {
+ for (int i = 0; i < totalTestBlocks; i++) {
+ SimulatedFSDataset fsDataset = (SimulatedFSDataset) mockFSDataset;
+ SimulatedStorage simulatedStorage = fsDataset.getStorages().get(0);
+ String storageId = simulatedStorage.getStorageUuid();
+ ExtendedBlock b = new ExtendedBlock(bpos.getBlockPoolId(), i, 0, i);
+ try {
+ fsDataset.createRbw(StorageType.DEFAULT, storageId, b, false);
+ bpos.notifyNamenodeReceivingBlock(b, storageId);
+ fsDataset.finalizeBlock(b, false);
+ Thread.sleep(1);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ addNewBlockThread.start();
+
+ // Make sure that generate blocks for DataNode and IBR not empty now.
+ Thread.sleep(200);
+ // Trigger re-register using DataNode Command.
+ datanodeCommands[0] = new DatanodeCommand[]{RegisterCommand.REGISTER};
+ bpos.triggerHeartbeatForTests();
+
+ try {
+ GenericTestUtils.waitFor(() -> {
+ if(fullBlockReportCount == totalTestBlocks ||
+ incrBlockReportCount == totalTestBlocks) {
+ return true;
+ }
+ return false;
+ }, 1000, 15000);
+ } catch (Exception e) {}
+
+ // Verify FBR/IBR count is equal to generate number.
+ assertTrue(fullBlockReportCount == totalTestBlocks ||
+ incrBlockReportCount == totalTestBlocks);
+ } finally {
+ bpos.stop();
+ bpos.join();
+
+ DataNodeFaultInjector.set(new DataNodeFaultInjector() {
+ public void blockUtilSendFullBlockReport() {}
+ });
+ }
+ }
+
@Test
public void testLocklessBlockPoolId() throws Exception {
BPOfferService bpos = Mockito.spy(setupBPOSForNNs(mockNN1));
@@ -615,6 +697,36 @@ public class TestBPOfferService {
secondCallTime = Time.now();
}
}
+
+ private void countBlockReportItems(final ExtendedBlock fakeBlock,
+ final DatanodeProtocolClientSideTranslatorPB mockNN) throws Exception {
+ final String fakeBlockPoolId = fakeBlock.getBlockPoolId();
+ final ArgumentCaptor<StorageBlockReport[]> captor =
+ ArgumentCaptor.forClass(StorageBlockReport[].class);
+
+ Mockito.doAnswer((Answer<Object>) invocation -> {
+ Object[] arguments = invocation.getArguments();
+ StorageBlockReport[] list = (StorageBlockReport[])arguments[2];
+ setBlockReportCount(list[0].getBlocks().getNumberOfBlocks());
+ return null;
+ }).when(mockNN).blockReport(
+ Mockito.any(),
+ Mockito.eq(fakeBlockPoolId),
+ captor.capture(),
+ Mockito.any()
+ );
+
+ Mockito.doAnswer((Answer<Object>) invocation -> {
+ Object[] arguments = invocation.getArguments();
+ StorageReceivedDeletedBlocks[] list =
+ (StorageReceivedDeletedBlocks[])arguments[2];
+ setIncreaseBlockReportCount(list[0].getBlocks().length);
+ return null;
+ }).when(mockNN).blockReceivedAndDeleted(
+ Mockito.any(),
+ Mockito.eq(fakeBlockPoolId),
+ Mockito.any());
+ }
private class BPOfferServiceSynchronousCallAnswer implements Answer<Void> {
private final int nnIdx;
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org