You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by we...@apache.org on 2019/03/04 18:56:10 UTC

[hadoop] branch branch-3.1 updated: HDFS-14314. fullBlockReportLeaseId should be reset after registering to NN. Contributed by star.

This is an automated email from the ASF dual-hosted git repository.

weichiu pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new d951497  HDFS-14314. fullBlockReportLeaseId should be reset after registering to NN. Contributed by star.
d951497 is described below

commit d951497f57cf6556b0916cad08576481dfe2ae06
Author: Wei-Chiu Chuang <we...@apache.org>
AuthorDate: Mon Mar 4 10:43:44 2019 -0800

    HDFS-14314. fullBlockReportLeaseId should be reset after registering to NN. Contributed by star.
    
    (cherry picked from commit 387dbe587aa66ac99ec5f5b50827ec3e0a327613)
    (cherry picked from commit e58ccca3ce131c955ceb115cd0b75e452eea828b)
---
 .../hdfs/server/datanode/BPServiceActor.java       |   7 +-
 .../hdfs/server/datanode/TestBPOfferService.java   | 125 ++++++++++++++++++++-
 2 files changed, 126 insertions(+), 6 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
index 6c167f4..00f7157 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
@@ -105,6 +105,7 @@ class BPServiceActor implements Runnable {
   private final DataNode dn;
   private final DNConf dnConf;
   private long prevBlockReportId;
+  private long fullBlockReportLeaseId;
   private final SortedSet<Integer> blockReportSizes =
       Collections.synchronizedSortedSet(new TreeSet<>());
   private final int maxDataLength;
@@ -129,6 +130,7 @@ class BPServiceActor implements Runnable {
         dnConf.ibrInterval,
         dn.getMetrics());
     prevBlockReportId = ThreadLocalRandom.current().nextLong();
+    fullBlockReportLeaseId = 0;
     scheduler = new Scheduler(dnConf.heartBeatInterval,
         dnConf.getLifelineIntervalMs(), dnConf.blockReportInterval,
         dnConf.outliersReportIntervalMs);
@@ -616,7 +618,6 @@ class BPServiceActor implements Runnable {
         + "; heartBeatInterval=" + dnConf.heartBeatInterval
         + (lifelineSender != null ?
             "; lifelineIntervalMs=" + dnConf.getLifelineIntervalMs() : ""));
-    long fullBlockReportLeaseId = 0;
 
     //
     // Now loop for a long time....
@@ -783,6 +784,10 @@ class BPServiceActor implements Runnable {
     LOG.info("Block pool " + this + " successfully registered with NN");
     bpos.registrationSucceeded(this, bpRegistration);
 
+    // reset lease id whenever registered to NN.
+    // ask for a new lease id at the next heartbeat.
+    fullBlockReportLeaseId = 0;
+
     // random short delay - helps scatter the BR from all DNs
     scheduler.scheduleBlockReport(dnConf.initialBlockReportDelayMs);
   }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
index 4863ca1..4c771a0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
@@ -27,12 +27,12 @@ import static org.junit.Assert.assertTrue;
 
 import java.io.File;
 import java.io.IOException;
+import java.net.ConnectException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -63,6 +63,8 @@ import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
 import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
+import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
+import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.StandbyException;
 import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto;
@@ -92,6 +94,9 @@ public class TestBPOfferService {
   private static final File TEST_BUILD_DATA = PathUtils.getTestDir(TestBPOfferService.class);
   private long firstCallTime = 0; 
   private long secondCallTime = 0;
+  private long firstLeaseId = 0;
+  private long secondLeaseId = 0;
+  private long nextFullBlockReportLeaseId = 1L;
 
   static {
     GenericTestUtils.setLogLevel(DataNode.LOG, Level.ALL);
@@ -171,16 +176,24 @@ public class TestBPOfferService {
   private class HeartbeatAnswer implements Answer<HeartbeatResponse> {
     private final int nnIdx;
 
-    public HeartbeatAnswer(int nnIdx) {
+    HeartbeatAnswer(int nnIdx) {
       this.nnIdx = nnIdx;
     }
 
     @Override
-    public HeartbeatResponse answer(InvocationOnMock invocation) throws Throwable {
+    public HeartbeatResponse answer(InvocationOnMock invocation)
+        throws Throwable {
       heartbeatCounts[nnIdx]++;
+      Boolean requestFullBlockReportLease =
+          (Boolean) invocation.getArguments()[8];
+      long fullBlockReportLeaseId = 0;
+      if (requestFullBlockReportLease) {
+        fullBlockReportLeaseId = nextFullBlockReportLeaseId++;
+      }
+      LOG.info("fullBlockReportLeaseId=" + fullBlockReportLeaseId);
       HeartbeatResponse heartbeatResponse = new HeartbeatResponse(
           datanodeCommands[nnIdx], mockHaStatuses[nnIdx], null,
-          ThreadLocalRandom.current().nextLong() | 1L);
+          fullBlockReportLeaseId);
       //reset the command
       datanodeCommands[nnIdx] = new DatanodeCommand[0];
       return heartbeatResponse;
@@ -188,6 +201,24 @@ public class TestBPOfferService {
   }
 
 
+  private class HeartbeatRegisterAnswer implements Answer<HeartbeatResponse> {
+    private final int nnIdx;
+
+    HeartbeatRegisterAnswer(int nnIdx) {
+      this.nnIdx = nnIdx;
+    }
+
+    @Override
+    public HeartbeatResponse answer(InvocationOnMock invocation)
+        throws Throwable {
+      heartbeatCounts[nnIdx]++;
+      DatanodeCommand[] cmds = new DatanodeCommand[1];
+      cmds[0] = new RegisterCommand();
+      return new HeartbeatResponse(cmds, mockHaStatuses[nnIdx],
+          null, 0L);
+    }
+  }
+
   /**
    * Test that the BPOS can register to talk to two different NNs,
    * sends block reports to both, etc.
@@ -523,6 +554,26 @@ public class TestBPOfferService {
     }, 500, 10000);
   }
 
+  private void waitForRegistration(
+      final DatanodeProtocolClientSideTranslatorPB mockNN, int times)
+      throws Exception {
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        try {
+          // The DN should have register to both NNs.
+          // first called by connectToNNAndHandshake, then called by reRegister.
+          Mockito.verify(mockNN, Mockito.times(2))
+              .registerDatanode(Mockito.any());
+          return true;
+        } catch (Throwable t) {
+          LOG.info("waiting on block registerDatanode: " + t.getMessage());
+          return false;
+        }
+      }
+    }, 500, 10000);
+  }
+
   private ReceivedDeletedBlockInfo[] waitForBlockReceived(
       final ExtendedBlock fakeBlock,
       final DatanodeProtocolClientSideTranslatorPB mockNN) throws Exception {
@@ -866,7 +917,7 @@ public class TestBPOfferService {
 
   }
 
-  @Test
+  @Test(timeout = 30000)
   public void testRefreshNameNodes() throws Exception {
 
     BPOfferService bpos = setupBPOSForNNs(mockDn, mockNN1, mockNN2);
@@ -935,4 +986,68 @@ public class TestBPOfferService {
       bpos.join();
     }
   }
+
+  @Test(timeout = 15000)
+  public void testRefreshLeaseId() throws Exception {
+    Mockito.when(mockNN1.sendHeartbeat(
+        Mockito.any(DatanodeRegistration.class),
+        Mockito.any(StorageReport[].class),
+        Mockito.anyLong(),
+        Mockito.anyLong(),
+        Mockito.anyInt(),
+        Mockito.anyInt(),
+        Mockito.anyInt(),
+        Mockito.any(VolumeFailureSummary.class),
+        Mockito.anyBoolean(),
+        Mockito.any(SlowPeerReports.class),
+        Mockito.any(SlowDiskReports.class)))
+        //heartbeat to old NN instance
+        .thenAnswer(new HeartbeatAnswer(0))
+        //heartbeat to new NN instance with Register Command
+        .thenAnswer(new HeartbeatRegisterAnswer(0))
+        .thenAnswer(new HeartbeatAnswer(0));
+
+    Mockito.when(mockNN1.blockReport(
+        Mockito.any(DatanodeRegistration.class),
+        Mockito.anyString(),
+        Mockito.any(StorageBlockReport[].class),
+        Mockito.any(BlockReportContext.class)))
+        .thenAnswer(
+          new Answer() {
+            @Override
+              public Object answer(InvocationOnMock invocation)
+                  throws Throwable {
+                BlockReportContext context =
+                    (BlockReportContext) invocation.getArguments()[3];
+                long leaseId = context.getLeaseId();
+                LOG.info("leaseId = "+leaseId);
+
+                // leaseId == 1 means DN make block report with old leaseId
+                // just reject and wait until DN request for a new leaseId
+                if(leaseId == 1) {
+                  firstLeaseId = leaseId;
+                  throw new ConnectException(
+                          "network is not reachable for test. ");
+                } else {
+                  secondLeaseId = leaseId;
+                  return null;
+                }
+              }
+          });
+
+    BPOfferService bpos = setupBPOSForNNs(mockNN1);
+    bpos.start();
+
+    try {
+      waitForInitialization(bpos);
+      // Should call registration 2 times
+      waitForRegistration(mockNN1, 2);
+      assertEquals(1L, firstLeaseId);
+      while(secondLeaseId != 2L) {
+        Thread.sleep(1000);
+      }
+    } finally {
+      bpos.stop();
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org