You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by we...@apache.org on 2019/03/04 18:56:10 UTC
[hadoop] branch branch-3.1 updated: HDFS-14314.
fullBlockReportLeaseId should be reset after registering to NN. Contributed
by star.
This is an automated email from the ASF dual-hosted git repository.
weichiu pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new d951497 HDFS-14314. fullBlockReportLeaseId should be reset after registering to NN. Contributed by star.
d951497 is described below
commit d951497f57cf6556b0916cad08576481dfe2ae06
Author: Wei-Chiu Chuang <we...@apache.org>
AuthorDate: Mon Mar 4 10:43:44 2019 -0800
HDFS-14314. fullBlockReportLeaseId should be reset after registering to NN. Contributed by star.
(cherry picked from commit 387dbe587aa66ac99ec5f5b50827ec3e0a327613)
(cherry picked from commit e58ccca3ce131c955ceb115cd0b75e452eea828b)
---
.../hdfs/server/datanode/BPServiceActor.java | 7 +-
.../hdfs/server/datanode/TestBPOfferService.java | 125 ++++++++++++++++++++-
2 files changed, 126 insertions(+), 6 deletions(-)
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
index 6c167f4..00f7157 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
@@ -105,6 +105,7 @@ class BPServiceActor implements Runnable {
private final DataNode dn;
private final DNConf dnConf;
private long prevBlockReportId;
+ private long fullBlockReportLeaseId;
private final SortedSet<Integer> blockReportSizes =
Collections.synchronizedSortedSet(new TreeSet<>());
private final int maxDataLength;
@@ -129,6 +130,7 @@ class BPServiceActor implements Runnable {
dnConf.ibrInterval,
dn.getMetrics());
prevBlockReportId = ThreadLocalRandom.current().nextLong();
+ fullBlockReportLeaseId = 0;
scheduler = new Scheduler(dnConf.heartBeatInterval,
dnConf.getLifelineIntervalMs(), dnConf.blockReportInterval,
dnConf.outliersReportIntervalMs);
@@ -616,7 +618,6 @@ class BPServiceActor implements Runnable {
+ "; heartBeatInterval=" + dnConf.heartBeatInterval
+ (lifelineSender != null ?
"; lifelineIntervalMs=" + dnConf.getLifelineIntervalMs() : ""));
- long fullBlockReportLeaseId = 0;
//
// Now loop for a long time....
@@ -783,6 +784,10 @@ class BPServiceActor implements Runnable {
LOG.info("Block pool " + this + " successfully registered with NN");
bpos.registrationSucceeded(this, bpRegistration);
+ // reset lease id whenever registered to NN.
+ // ask for a new lease id at the next heartbeat.
+ fullBlockReportLeaseId = 0;
+
// random short delay - helps scatter the BR from all DNs
scheduler.scheduleBlockReport(dnConf.initialBlockReportDelayMs);
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
index 4863ca1..4c771a0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
@@ -27,12 +27,12 @@ import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
+import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -63,6 +63,8 @@ import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
+import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
+import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto;
@@ -92,6 +94,9 @@ public class TestBPOfferService {
private static final File TEST_BUILD_DATA = PathUtils.getTestDir(TestBPOfferService.class);
private long firstCallTime = 0;
private long secondCallTime = 0;
+ private long firstLeaseId = 0;
+ private long secondLeaseId = 0;
+ private long nextFullBlockReportLeaseId = 1L;
static {
GenericTestUtils.setLogLevel(DataNode.LOG, Level.ALL);
@@ -171,16 +176,24 @@ public class TestBPOfferService {
private class HeartbeatAnswer implements Answer<HeartbeatResponse> {
private final int nnIdx;
- public HeartbeatAnswer(int nnIdx) {
+ HeartbeatAnswer(int nnIdx) {
this.nnIdx = nnIdx;
}
@Override
- public HeartbeatResponse answer(InvocationOnMock invocation) throws Throwable {
+ public HeartbeatResponse answer(InvocationOnMock invocation)
+ throws Throwable {
heartbeatCounts[nnIdx]++;
+ Boolean requestFullBlockReportLease =
+ (Boolean) invocation.getArguments()[8];
+ long fullBlockReportLeaseId = 0;
+ if (requestFullBlockReportLease) {
+ fullBlockReportLeaseId = nextFullBlockReportLeaseId++;
+ }
+ LOG.info("fullBlockReportLeaseId=" + fullBlockReportLeaseId);
HeartbeatResponse heartbeatResponse = new HeartbeatResponse(
datanodeCommands[nnIdx], mockHaStatuses[nnIdx], null,
- ThreadLocalRandom.current().nextLong() | 1L);
+ fullBlockReportLeaseId);
//reset the command
datanodeCommands[nnIdx] = new DatanodeCommand[0];
return heartbeatResponse;
@@ -188,6 +201,24 @@ public class TestBPOfferService {
}
+ private class HeartbeatRegisterAnswer implements Answer<HeartbeatResponse> {
+ private final int nnIdx;
+
+ HeartbeatRegisterAnswer(int nnIdx) {
+ this.nnIdx = nnIdx;
+ }
+
+ @Override
+ public HeartbeatResponse answer(InvocationOnMock invocation)
+ throws Throwable {
+ heartbeatCounts[nnIdx]++;
+ DatanodeCommand[] cmds = new DatanodeCommand[1];
+ cmds[0] = new RegisterCommand();
+ return new HeartbeatResponse(cmds, mockHaStatuses[nnIdx],
+ null, 0L);
+ }
+ }
+
/**
* Test that the BPOS can register to talk to two different NNs,
* sends block reports to both, etc.
@@ -523,6 +554,26 @@ public class TestBPOfferService {
}, 500, 10000);
}
+ private void waitForRegistration(
+ final DatanodeProtocolClientSideTranslatorPB mockNN, int times)
+ throws Exception {
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ try {
+ // The DN should have register to both NNs.
+ // first called by connectToNNAndHandshake, then called by reRegister.
+ Mockito.verify(mockNN, Mockito.times(2))
+ .registerDatanode(Mockito.any());
+ return true;
+ } catch (Throwable t) {
+ LOG.info("waiting on block registerDatanode: " + t.getMessage());
+ return false;
+ }
+ }
+ }, 500, 10000);
+ }
+
private ReceivedDeletedBlockInfo[] waitForBlockReceived(
final ExtendedBlock fakeBlock,
final DatanodeProtocolClientSideTranslatorPB mockNN) throws Exception {
@@ -866,7 +917,7 @@ public class TestBPOfferService {
}
- @Test
+ @Test(timeout = 30000)
public void testRefreshNameNodes() throws Exception {
BPOfferService bpos = setupBPOSForNNs(mockDn, mockNN1, mockNN2);
@@ -935,4 +986,68 @@ public class TestBPOfferService {
bpos.join();
}
}
+
+ @Test(timeout = 15000)
+ public void testRefreshLeaseId() throws Exception {
+ Mockito.when(mockNN1.sendHeartbeat(
+ Mockito.any(DatanodeRegistration.class),
+ Mockito.any(StorageReport[].class),
+ Mockito.anyLong(),
+ Mockito.anyLong(),
+ Mockito.anyInt(),
+ Mockito.anyInt(),
+ Mockito.anyInt(),
+ Mockito.any(VolumeFailureSummary.class),
+ Mockito.anyBoolean(),
+ Mockito.any(SlowPeerReports.class),
+ Mockito.any(SlowDiskReports.class)))
+ //heartbeat to old NN instance
+ .thenAnswer(new HeartbeatAnswer(0))
+ //heartbeat to new NN instance with Register Command
+ .thenAnswer(new HeartbeatRegisterAnswer(0))
+ .thenAnswer(new HeartbeatAnswer(0));
+
+ Mockito.when(mockNN1.blockReport(
+ Mockito.any(DatanodeRegistration.class),
+ Mockito.anyString(),
+ Mockito.any(StorageBlockReport[].class),
+ Mockito.any(BlockReportContext.class)))
+ .thenAnswer(
+ new Answer() {
+ @Override
+ public Object answer(InvocationOnMock invocation)
+ throws Throwable {
+ BlockReportContext context =
+ (BlockReportContext) invocation.getArguments()[3];
+ long leaseId = context.getLeaseId();
+ LOG.info("leaseId = "+leaseId);
+
+ // leaseId == 1 means DN make block report with old leaseId
+ // just reject and wait until DN request for a new leaseId
+ if(leaseId == 1) {
+ firstLeaseId = leaseId;
+ throw new ConnectException(
+ "network is not reachable for test. ");
+ } else {
+ secondLeaseId = leaseId;
+ return null;
+ }
+ }
+ });
+
+ BPOfferService bpos = setupBPOSForNNs(mockNN1);
+ bpos.start();
+
+ try {
+ waitForInitialization(bpos);
+ // Should call registration 2 times
+ waitForRegistration(mockNN1, 2);
+ assertEquals(1L, firstLeaseId);
+ while(secondLeaseId != 2L) {
+ Thread.sleep(1000);
+ }
+ } finally {
+ bpos.stop();
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org