You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by br...@apache.org on 2018/01/16 11:20:23 UTC
[4/5] hadoop git commit: HDFS-8693. refreshNamenodes does not support
adding a new standby to a running DN. Contributed by Ajith S.
HDFS-8693. refreshNamenodes does not support adding a new standby to a running DN. Contributed by Ajith S.
(cherry picked from commit 880b9d24ff7b5f350ec99bac9b0862009460b486)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/28f69755
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/28f69755
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/28f69755
Branch: refs/heads/branch-2.9
Commit: 28f69755f15ee959b00277a2c173f8b65cebeddb
Parents: 4e28aa4
Author: Brahma Reddy Battula <br...@apache.org>
Authored: Tue Jan 16 16:13:19 2018 +0530
Committer: Brahma Reddy Battula <br...@apache.org>
Committed: Tue Jan 16 16:33:34 2018 +0530
----------------------------------------------------------------------
.../hdfs/server/datanode/BPOfferService.java | 24 ++++--
.../server/datanode/TestBPOfferService.java | 77 +++++++++++++++++++-
2 files changed, 93 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/28f69755/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
index 31bcd3c..8b154c0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
@@ -146,11 +146,25 @@ class BPOfferService {
}
Set<InetSocketAddress> newAddrs = Sets.newHashSet(addrs);
- if (!Sets.symmetricDifference(oldAddrs, newAddrs).isEmpty()) {
- // Keep things simple for now -- we can implement this at a later date.
- throw new IOException(
- "HA does not currently support adding a new standby to a running DN. " +
- "Please do a rolling restart of DNs to reconfigure the list of NNs.");
+ // Process added NNs
+ Set<InetSocketAddress> addedNNs = Sets.difference(newAddrs, oldAddrs);
+ for (InetSocketAddress addedNN : addedNNs) {
+ BPServiceActor actor = new BPServiceActor(addedNN,
+ lifelineAddrs.get(addrs.indexOf(addedNN)), this);
+ actor.start();
+ bpServices.add(actor);
+ }
+
+ // Process removed NNs
+ Set<InetSocketAddress> removedNNs = Sets.difference(oldAddrs, newAddrs);
+ for (InetSocketAddress removedNN : removedNNs) {
+ for (BPServiceActor actor : bpServices) {
+ if (actor.getNNSocketAddress().equals(removedNN)) {
+ actor.stop();
+ shutdownActor(actor);
+ break;
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/28f69755/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
index ec19926..4863ca1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
@@ -28,6 +28,7 @@ import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -99,10 +100,10 @@ public class TestBPOfferService {
private DatanodeProtocolClientSideTranslatorPB mockNN1;
private DatanodeProtocolClientSideTranslatorPB mockNN2;
private final NNHAStatusHeartbeat[] mockHaStatuses =
- new NNHAStatusHeartbeat[2];
+ new NNHAStatusHeartbeat[3];
private final DatanodeCommand[][] datanodeCommands =
- new DatanodeCommand[2][0];
- private final int[] heartbeatCounts = new int[2];
+ new DatanodeCommand[3][0];
+ private final int[] heartbeatCounts = new int[3];
private DataNode mockDn;
private FsDatasetSpi<?> mockFSDataset;
@@ -864,4 +865,74 @@ public class TestBPOfferService {
assertNotNull(bpos.getActiveNN());
}
+
+ @Test
+ public void testRefreshNameNodes() throws Exception {
+
+ BPOfferService bpos = setupBPOSForNNs(mockDn, mockNN1, mockNN2);
+
+ bpos.start();
+ try {
+ waitForBothActors(bpos);
+
+ // The DN should have register to both NNs.
+ Mockito.verify(mockNN1)
+ .registerDatanode(Mockito.any(DatanodeRegistration.class));
+ Mockito.verify(mockNN2)
+ .registerDatanode(Mockito.any(DatanodeRegistration.class));
+
+ // Should get block reports from both NNs
+ waitForBlockReport(mockNN1);
+ waitForBlockReport(mockNN2);
+
+ // When we receive a block, it should report it to both NNs
+ bpos.notifyNamenodeReceivedBlock(FAKE_BLOCK, null, "", false);
+
+ ReceivedDeletedBlockInfo[] ret = waitForBlockReceived(FAKE_BLOCK,
+ mockNN1);
+ assertEquals(1, ret.length);
+ assertEquals(FAKE_BLOCK.getLocalBlock(), ret[0].getBlock());
+
+ ret = waitForBlockReceived(FAKE_BLOCK, mockNN2);
+ assertEquals(1, ret.length);
+ assertEquals(FAKE_BLOCK.getLocalBlock(), ret[0].getBlock());
+
+ // add new standby
+ DatanodeProtocolClientSideTranslatorPB mockNN3 = setupNNMock(2);
+ Mockito.doReturn(mockNN3).when(mockDn)
+ .connectToNN(Mockito.eq(new InetSocketAddress(2)));
+
+ ArrayList<InetSocketAddress> addrs = new ArrayList<>();
+ ArrayList<InetSocketAddress> lifelineAddrs = new ArrayList<>(
+ addrs.size());
+ // mockNN1
+ addrs.add(new InetSocketAddress(0));
+ lifelineAddrs.add(null);
+ // mockNN3
+ addrs.add(new InetSocketAddress(2));
+ lifelineAddrs.add(null);
+
+ bpos.refreshNNList(addrs, lifelineAddrs);
+
+ assertEquals(2, bpos.getBPServiceActors().size());
+ // wait for handshake to run
+ Thread.sleep(1000);
+
+ // verify new NN registered
+ Mockito.verify(mockNN3)
+ .registerDatanode(Mockito.any(DatanodeRegistration.class));
+
+ // When we receive a block, it should report it to both NNs
+ bpos.notifyNamenodeReceivedBlock(FAKE_BLOCK, null, "", false);
+
+ // veridfy new NN recieved block report
+ ret = waitForBlockReceived(FAKE_BLOCK, mockNN3);
+ assertEquals(1, ret.length);
+ assertEquals(FAKE_BLOCK.getLocalBlock(), ret[0].getBlock());
+
+ } finally {
+ bpos.stop();
+ bpos.join();
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org