You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by we...@apache.org on 2019/03/04 19:42:52 UTC

[hadoop] branch branch-2.9 updated: HDFS-14314. fullBlockReportLeaseId should be reset after registering to NN. Contributed by star.

This is an automated email from the ASF dual-hosted git repository.

weichiu pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new 58f490a  HDFS-14314. fullBlockReportLeaseId should be reset after registering to NN. Contributed by star.
58f490a is described below

commit 58f490a34ea3de9d8d8ad8e5c35e13e992d2e9b0
Author: Wei-Chiu Chuang <we...@apache.org>
AuthorDate: Mon Mar 4 10:43:44 2019 -0800

    HDFS-14314. fullBlockReportLeaseId should be reset after registering to NN. Contributed by star.
    
    (cherry picked from commit 387dbe587aa66ac99ec5f5b50827ec3e0a327613)
    (cherry picked from commit e58ccca3ce131c955ceb115cd0b75e452eea828b)
    (cherry picked from commit d951497f57cf6556b0916cad08576481dfe2ae06)
    (cherry picked from commit e23a448e0e32ce5139b76d47c73fce621ccb66bd)
    (cherry picked from commit d71cfe14610ab471651b721c28f5488987b5e17c)
---
 .../hdfs/server/datanode/BPServiceActor.java       |   7 +-
 .../hdfs/server/datanode/TestBPOfferService.java   | 125 ++++++++++++++++++++-
 2 files changed, 126 insertions(+), 6 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
index c260272..84dbd06 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
@@ -105,6 +105,7 @@ class BPServiceActor implements Runnable {
   private final DataNode dn;
   private final DNConf dnConf;
   private long prevBlockReportId;
+  private long fullBlockReportLeaseId;
   private final SortedSet<Integer> blockReportSizes =
       Collections.synchronizedSortedSet(new TreeSet<Integer>());
   private final int maxDataLength;
@@ -129,6 +130,7 @@ class BPServiceActor implements Runnable {
         dnConf.ibrInterval,
         dn.getMetrics());
     prevBlockReportId = ThreadLocalRandom.current().nextLong();
+    fullBlockReportLeaseId = 0;
     scheduler = new Scheduler(dnConf.heartBeatInterval,
         dnConf.getLifelineIntervalMs(), dnConf.blockReportInterval,
         dnConf.outliersReportIntervalMs);
@@ -616,7 +618,6 @@ class BPServiceActor implements Runnable {
         + "; heartBeatInterval=" + dnConf.heartBeatInterval
         + (lifelineSender != null ?
             "; lifelineIntervalMs=" + dnConf.getLifelineIntervalMs() : ""));
-    long fullBlockReportLeaseId = 0;
 
     //
     // Now loop for a long time....
@@ -782,6 +783,10 @@ class BPServiceActor implements Runnable {
     LOG.info("Block pool " + this + " successfully registered with NN");
     bpos.registrationSucceeded(this, bpRegistration);
 
+    // reset lease id whenever registered to NN.
+    // ask for a new lease id at the next heartbeat.
+    fullBlockReportLeaseId = 0;
+
     // random short delay - helps scatter the BR from all DNs
     scheduler.scheduleBlockReport(dnConf.initialBlockReportDelayMs);
   }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
index 4863ca1..3e3c0c6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
@@ -27,12 +27,12 @@ import static org.junit.Assert.assertTrue;
 
 import java.io.File;
 import java.io.IOException;
+import java.net.ConnectException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -63,6 +63,8 @@ import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
 import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
+import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
+import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.StandbyException;
 import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto;
@@ -92,6 +94,9 @@ public class TestBPOfferService {
   private static final File TEST_BUILD_DATA = PathUtils.getTestDir(TestBPOfferService.class);
   private long firstCallTime = 0; 
   private long secondCallTime = 0;
+  private long firstLeaseId = 0;
+  private long secondLeaseId = 0;
+  private long nextFullBlockReportLeaseId = 1L;
 
   static {
     GenericTestUtils.setLogLevel(DataNode.LOG, Level.ALL);
@@ -171,16 +176,24 @@ public class TestBPOfferService {
   private class HeartbeatAnswer implements Answer<HeartbeatResponse> {
     private final int nnIdx;
 
-    public HeartbeatAnswer(int nnIdx) {
+    HeartbeatAnswer(int nnIdx) {
       this.nnIdx = nnIdx;
     }
 
     @Override
-    public HeartbeatResponse answer(InvocationOnMock invocation) throws Throwable {
+    public HeartbeatResponse answer(InvocationOnMock invocation)
+        throws Throwable {
       heartbeatCounts[nnIdx]++;
+      Boolean requestFullBlockReportLease =
+          (Boolean) invocation.getArguments()[8];
+      long fullBlockReportLeaseId = 0;
+      if (requestFullBlockReportLease) {
+        fullBlockReportLeaseId = nextFullBlockReportLeaseId++;
+      }
+      LOG.info("fullBlockReportLeaseId=" + fullBlockReportLeaseId);
       HeartbeatResponse heartbeatResponse = new HeartbeatResponse(
           datanodeCommands[nnIdx], mockHaStatuses[nnIdx], null,
-          ThreadLocalRandom.current().nextLong() | 1L);
+          fullBlockReportLeaseId);
       //reset the command
       datanodeCommands[nnIdx] = new DatanodeCommand[0];
       return heartbeatResponse;
@@ -188,6 +201,24 @@ public class TestBPOfferService {
   }
 
 
+  private class HeartbeatRegisterAnswer implements Answer<HeartbeatResponse> {
+    private final int nnIdx;
+
+    HeartbeatRegisterAnswer(int nnIdx) {
+      this.nnIdx = nnIdx;
+    }
+
+    @Override
+    public HeartbeatResponse answer(InvocationOnMock invocation)
+        throws Throwable {
+      heartbeatCounts[nnIdx]++;
+      DatanodeCommand[] cmds = new DatanodeCommand[1];
+      cmds[0] = new RegisterCommand();
+      return new HeartbeatResponse(cmds, mockHaStatuses[nnIdx],
+          null, 0L);
+    }
+  }
+
   /**
    * Test that the BPOS can register to talk to two different NNs,
    * sends block reports to both, etc.
@@ -523,6 +554,26 @@ public class TestBPOfferService {
     }, 500, 10000);
   }
 
+  private void waitForRegistration(
+      final DatanodeProtocolClientSideTranslatorPB mockNN, int times)
+      throws Exception {
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        try {
+          // The DN should have register to both NNs.
+          // first called by connectToNNAndHandshake, then called by reRegister.
+          Mockito.verify(mockNN, Mockito.times(2))
+              .registerDatanode(Mockito.any(DatanodeRegistration.class));
+          return true;
+        } catch (Throwable t) {
+          LOG.info("waiting on block registerDatanode: " + t.getMessage());
+          return false;
+        }
+      }
+    }, 500, 10000);
+  }
+
   private ReceivedDeletedBlockInfo[] waitForBlockReceived(
       final ExtendedBlock fakeBlock,
       final DatanodeProtocolClientSideTranslatorPB mockNN) throws Exception {
@@ -866,7 +917,7 @@ public class TestBPOfferService {
 
   }
 
-  @Test
+  @Test(timeout = 30000)
   public void testRefreshNameNodes() throws Exception {
 
     BPOfferService bpos = setupBPOSForNNs(mockDn, mockNN1, mockNN2);
@@ -935,4 +986,68 @@ public class TestBPOfferService {
       bpos.join();
     }
   }
+
+  @Test(timeout = 15000)
+  public void testRefreshLeaseId() throws Exception {
+    Mockito.when(mockNN1.sendHeartbeat(
+        Mockito.any(DatanodeRegistration.class),
+        Mockito.any(StorageReport[].class),
+        Mockito.anyLong(),
+        Mockito.anyLong(),
+        Mockito.anyInt(),
+        Mockito.anyInt(),
+        Mockito.anyInt(),
+        Mockito.any(VolumeFailureSummary.class),
+        Mockito.anyBoolean(),
+        Mockito.any(SlowPeerReports.class),
+        Mockito.any(SlowDiskReports.class)))
+        //heartbeat to old NN instance
+        .thenAnswer(new HeartbeatAnswer(0))
+        //heartbeat to new NN instance with Register Command
+        .thenAnswer(new HeartbeatRegisterAnswer(0))
+        .thenAnswer(new HeartbeatAnswer(0));
+
+    Mockito.when(mockNN1.blockReport(
+        Mockito.any(DatanodeRegistration.class),
+        Mockito.anyString(),
+        Mockito.any(StorageBlockReport[].class),
+        Mockito.any(BlockReportContext.class)))
+        .thenAnswer(
+          new Answer() {
+            @Override
+              public Object answer(InvocationOnMock invocation)
+                  throws Throwable {
+                BlockReportContext context =
+                    (BlockReportContext) invocation.getArguments()[3];
+                long leaseId = context.getLeaseId();
+                LOG.info("leaseId = "+leaseId);
+
+                // leaseId == 1 means DN make block report with old leaseId
+                // just reject and wait until DN request for a new leaseId
+                if(leaseId == 1) {
+                  firstLeaseId = leaseId;
+                  throw new ConnectException(
+                          "network is not reachable for test. ");
+                } else {
+                  secondLeaseId = leaseId;
+                  return null;
+                }
+              }
+          });
+
+    BPOfferService bpos = setupBPOSForNNs(mockNN1);
+    bpos.start();
+
+    try {
+      waitForInitialization(bpos);
+      // Should call registration 2 times
+      waitForRegistration(mockNN1, 2);
+      assertEquals(1L, firstLeaseId);
+      while(secondLeaseId != 2L) {
+        Thread.sleep(1000);
+      }
+    } finally {
+      bpos.stop();
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org