You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by br...@apache.org on 2018/01/16 11:20:23 UTC

[4/5] hadoop git commit: HDFS-8693. refreshNamenodes does not support adding a new standby to a running DN. Contributed by Ajith S.

HDFS-8693. refreshNamenodes does not support adding a new standby to a running DN. Contributed by Ajith S.

(cherry picked from commit 880b9d24ff7b5f350ec99bac9b0862009460b486)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/28f69755
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/28f69755
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/28f69755

Branch: refs/heads/branch-2.9
Commit: 28f69755f15ee959b00277a2c173f8b65cebeddb
Parents: 4e28aa4
Author: Brahma Reddy Battula <br...@apache.org>
Authored: Tue Jan 16 16:13:19 2018 +0530
Committer: Brahma Reddy Battula <br...@apache.org>
Committed: Tue Jan 16 16:33:34 2018 +0530

----------------------------------------------------------------------
 .../hdfs/server/datanode/BPOfferService.java    | 24 ++++--
 .../server/datanode/TestBPOfferService.java     | 77 +++++++++++++++++++-
 2 files changed, 93 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/28f69755/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
index 31bcd3c..8b154c0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
@@ -146,11 +146,25 @@ class BPOfferService {
     }
     Set<InetSocketAddress> newAddrs = Sets.newHashSet(addrs);
     
-    if (!Sets.symmetricDifference(oldAddrs, newAddrs).isEmpty()) {
-      // Keep things simple for now -- we can implement this at a later date.
-      throw new IOException(
-          "HA does not currently support adding a new standby to a running DN. " +
-          "Please do a rolling restart of DNs to reconfigure the list of NNs.");
+    // Process added NNs
+    Set<InetSocketAddress> addedNNs = Sets.difference(newAddrs, oldAddrs);
+    for (InetSocketAddress addedNN : addedNNs) {
+      BPServiceActor actor = new BPServiceActor(addedNN,
+          lifelineAddrs.get(addrs.indexOf(addedNN)), this);
+      actor.start();
+      bpServices.add(actor);
+    }
+
+    // Process removed NNs
+    Set<InetSocketAddress> removedNNs = Sets.difference(oldAddrs, newAddrs);
+    for (InetSocketAddress removedNN : removedNNs) {
+      for (BPServiceActor actor : bpServices) {
+        if (actor.getNNSocketAddress().equals(removedNN)) {
+          actor.stop();
+          shutdownActor(actor);
+          break;
+        }
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/28f69755/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
index ec19926..4863ca1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
@@ -28,6 +28,7 @@ import static org.junit.Assert.assertTrue;
 import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -99,10 +100,10 @@ public class TestBPOfferService {
   private DatanodeProtocolClientSideTranslatorPB mockNN1;
   private DatanodeProtocolClientSideTranslatorPB mockNN2;
   private final NNHAStatusHeartbeat[] mockHaStatuses =
-      new NNHAStatusHeartbeat[2];
+      new NNHAStatusHeartbeat[3];
   private final DatanodeCommand[][] datanodeCommands =
-      new DatanodeCommand[2][0];
-  private final int[] heartbeatCounts = new int[2];
+      new DatanodeCommand[3][0];
+  private final int[] heartbeatCounts = new int[3];
   private DataNode mockDn;
   private FsDatasetSpi<?> mockFSDataset;
   
@@ -864,4 +865,74 @@ public class TestBPOfferService {
     assertNotNull(bpos.getActiveNN());
 
   }
+
+  @Test
+  public void testRefreshNameNodes() throws Exception {
+
+    BPOfferService bpos = setupBPOSForNNs(mockDn, mockNN1, mockNN2);
+
+    bpos.start();
+    try {
+      waitForBothActors(bpos);
+
+      // The DN should have register to both NNs.
+      Mockito.verify(mockNN1)
+          .registerDatanode(Mockito.any(DatanodeRegistration.class));
+      Mockito.verify(mockNN2)
+          .registerDatanode(Mockito.any(DatanodeRegistration.class));
+
+      // Should get block reports from both NNs
+      waitForBlockReport(mockNN1);
+      waitForBlockReport(mockNN2);
+
+      // When we receive a block, it should report it to both NNs
+      bpos.notifyNamenodeReceivedBlock(FAKE_BLOCK, null, "", false);
+
+      ReceivedDeletedBlockInfo[] ret = waitForBlockReceived(FAKE_BLOCK,
+          mockNN1);
+      assertEquals(1, ret.length);
+      assertEquals(FAKE_BLOCK.getLocalBlock(), ret[0].getBlock());
+
+      ret = waitForBlockReceived(FAKE_BLOCK, mockNN2);
+      assertEquals(1, ret.length);
+      assertEquals(FAKE_BLOCK.getLocalBlock(), ret[0].getBlock());
+
+      // add new standby
+      DatanodeProtocolClientSideTranslatorPB mockNN3 = setupNNMock(2);
+      Mockito.doReturn(mockNN3).when(mockDn)
+          .connectToNN(Mockito.eq(new InetSocketAddress(2)));
+
+      ArrayList<InetSocketAddress> addrs = new ArrayList<>();
+      ArrayList<InetSocketAddress> lifelineAddrs = new ArrayList<>(
+          addrs.size());
+      // mockNN1
+      addrs.add(new InetSocketAddress(0));
+      lifelineAddrs.add(null);
+      // mockNN3
+      addrs.add(new InetSocketAddress(2));
+      lifelineAddrs.add(null);
+
+      bpos.refreshNNList(addrs, lifelineAddrs);
+
+      assertEquals(2, bpos.getBPServiceActors().size());
+      // wait for handshake to run
+      Thread.sleep(1000);
+
+      // verify new NN registered
+      Mockito.verify(mockNN3)
+          .registerDatanode(Mockito.any(DatanodeRegistration.class));
+
+      // When we receive a block, it should report it to both NNs
+      bpos.notifyNamenodeReceivedBlock(FAKE_BLOCK, null, "", false);
+
+      // veridfy new NN recieved block report
+      ret = waitForBlockReceived(FAKE_BLOCK, mockNN3);
+      assertEquals(1, ret.length);
+      assertEquals(FAKE_BLOCK.getLocalBlock(), ret[0].getBlock());
+
+    } finally {
+      bpos.stop();
+      bpos.join();
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org