You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by he...@apache.org on 2021/09/17 14:19:58 UTC

[hadoop] 05/06: HDFS-15113. Missing IBR when NameNode restart if open processCommand async feature. Contributed by Xiaoqiao He.

This is an automated email from the ASF dual-hosted git repository.

hexiaoqiao pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 50b0f0dc42d5cdcfbd381527ff774f20852888ba
Author: He Xiaoqiao <he...@apache.org>
AuthorDate: Fri Mar 13 18:54:36 2020 -0700

    HDFS-15113. Missing IBR when NameNode restart if open processCommand async feature. Contributed by Xiaoqiao He.
    
    Signed-off-by: Wei-Chiu Chuang <we...@apache.org>
    Reviewed-by: Brahma Reddy Battula <br...@apache.org>
    Reviewed-by: Inigo Goiri <in...@apache.org>
    (cherry picked from commit e9955bb8ff44396eb478f709307d647ca884de99)
---
 .../hdfs/server/datanode/BPServiceActor.java       |   9 +-
 .../server/datanode/DataNodeFaultInjector.java     |   5 +
 .../hdfs/server/datanode/TestBPOfferService.java   | 112 +++++++++++++++++++++
 3 files changed, 123 insertions(+), 3 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
index e8c64d2..ea91402 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
@@ -891,14 +891,17 @@ class BPServiceActor implements Runnable {
       // re-retrieve namespace info to make sure that, if the NN
       // was restarted, we still match its version (HDFS-2120)
       NamespaceInfo nsInfo = retrieveNamespaceInfo();
-      // and re-register
-      register(nsInfo);
-      scheduler.scheduleHeartbeat();
       // HDFS-9917,Standby NN IBR can be very huge if standby namenode is down
       // for sometime.
       if (state == HAServiceState.STANDBY || state == HAServiceState.OBSERVER) {
         ibrManager.clearIBRs();
       }
+      // HDFS-15113, register and trigger FBR after clean IBR to avoid missing
+      // some blocks report to Standby util next FBR.
+      // and re-register
+      register(nsInfo);
+      scheduler.scheduleHeartbeat();
+      DataNodeFaultInjector.get().blockUtilSendFullBlockReport();
     }
   }
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
index 821717b..4d3d220 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
@@ -134,4 +134,9 @@ public class DataNodeFaultInjector {
    * into an erasure coding reconstruction.
    */
   public void badDecoding(ByteBuffer[] outputs) {}
+
+  /**
+   * Used as a hook to inject intercept when re-register.
+   */
+  public void blockUtilSendFullBlockReport() {}
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
index 98af2fa..79ad5d1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
@@ -21,6 +21,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
@@ -57,6 +58,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
+import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset.SimulatedStorage;
 import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
@@ -107,6 +109,8 @@ public class TestBPOfferService {
   private long firstLeaseId = 0;
   private long secondLeaseId = 0;
   private long nextFullBlockReportLeaseId = 1L;
+  private int fullBlockReportCount = 0;
+  private int incrBlockReportCount = 0;
 
   static {
     GenericTestUtils.setLogLevel(DataNode.LOG, Level.ALL);
@@ -229,6 +233,14 @@ public class TestBPOfferService {
     }
   }
 
+  private void setBlockReportCount(int count) {
+    fullBlockReportCount = count;
+  }
+
+  private void setIncreaseBlockReportCount(int count) {
+    incrBlockReportCount += count;
+  }
+
   /**
    * Test that the BPOS can register to talk to two different NNs,
    * sends block reports to both, etc.
@@ -267,6 +279,76 @@ public class TestBPOfferService {
     }
   }
 
+  /**
+   * HDFS-15113: Test and verify missing block when re-register.
+   */
+  @Test
+  public void testMissBlocksWhenReregister() throws Exception {
+    BPOfferService bpos = setupBPOSForNNs(mockNN1, mockNN2);
+    bpos.start();
+    try {
+      waitForBothActors(bpos);
+      waitForInitialization(bpos);
+
+      DataNodeFaultInjector.set(new DataNodeFaultInjector() {
+        public void blockUtilSendFullBlockReport() {
+          try {
+            Thread.sleep(200);
+          } catch (InterruptedException e) {
+            e.printStackTrace();
+          }
+        }
+      });
+
+      countBlockReportItems(FAKE_BLOCK, mockNN1);
+      int totalTestBlocks = 4000;
+      Thread addNewBlockThread = new Thread(() -> {
+        for (int i = 0; i < totalTestBlocks; i++) {
+          SimulatedFSDataset fsDataset = (SimulatedFSDataset) mockFSDataset;
+          SimulatedStorage simulatedStorage = fsDataset.getStorages().get(0);
+          String storageId = simulatedStorage.getStorageUuid();
+          ExtendedBlock b = new ExtendedBlock(bpos.getBlockPoolId(), i, 0, i);
+          try {
+            fsDataset.createRbw(StorageType.DEFAULT, storageId, b, false);
+            bpos.notifyNamenodeReceivingBlock(b, storageId);
+            fsDataset.finalizeBlock(b, false);
+            Thread.sleep(1);
+          } catch (Exception e) {
+            e.printStackTrace();
+          }
+        }
+      });
+      addNewBlockThread.start();
+
+      // Make sure that generate blocks for DataNode and IBR not empty now.
+      Thread.sleep(200);
+      // Trigger re-register using DataNode Command.
+      datanodeCommands[0] = new DatanodeCommand[]{RegisterCommand.REGISTER};
+      bpos.triggerHeartbeatForTests();
+
+      try {
+        GenericTestUtils.waitFor(() -> {
+          if(fullBlockReportCount == totalTestBlocks ||
+              incrBlockReportCount == totalTestBlocks) {
+            return true;
+          }
+          return false;
+        }, 1000, 15000);
+      } catch (Exception e) {}
+
+      // Verify FBR/IBR count is equal to generate number.
+      assertTrue(fullBlockReportCount == totalTestBlocks ||
+          incrBlockReportCount == totalTestBlocks);
+    } finally {
+      bpos.stop();
+      bpos.join();
+
+      DataNodeFaultInjector.set(new DataNodeFaultInjector() {
+        public void blockUtilSendFullBlockReport() {}
+      });
+    }
+  }
+
   @Test
   public void testLocklessBlockPoolId() throws Exception {
     BPOfferService bpos = Mockito.spy(setupBPOSForNNs(mockNN1));
@@ -615,6 +697,36 @@ public class TestBPOfferService {
       secondCallTime = Time.now();
     }
   }
+
+  private void countBlockReportItems(final ExtendedBlock fakeBlock,
+      final DatanodeProtocolClientSideTranslatorPB mockNN) throws Exception {
+    final String fakeBlockPoolId = fakeBlock.getBlockPoolId();
+    final ArgumentCaptor<StorageBlockReport[]> captor =
+        ArgumentCaptor.forClass(StorageBlockReport[].class);
+
+    Mockito.doAnswer((Answer<Object>) invocation -> {
+      Object[] arguments = invocation.getArguments();
+      StorageBlockReport[] list = (StorageBlockReport[])arguments[2];
+      setBlockReportCount(list[0].getBlocks().getNumberOfBlocks());
+      return null;
+    }).when(mockNN).blockReport(
+        Mockito.any(),
+        Mockito.eq(fakeBlockPoolId),
+        captor.capture(),
+        Mockito.any()
+    );
+
+    Mockito.doAnswer((Answer<Object>) invocation -> {
+      Object[] arguments = invocation.getArguments();
+      StorageReceivedDeletedBlocks[] list =
+          (StorageReceivedDeletedBlocks[])arguments[2];
+      setIncreaseBlockReportCount(list[0].getBlocks().length);
+      return null;
+    }).when(mockNN).blockReceivedAndDeleted(
+        Mockito.any(),
+        Mockito.eq(fakeBlockPoolId),
+        Mockito.any());
+  }
   
   private class BPOfferServiceSynchronousCallAnswer implements Answer<Void> {
     private final int nnIdx;

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org