You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@hbase.apache.org by GitBox <gi...@apache.org> on 2020/07/24 15:00:58 UTC

[GitHub] [hbase] virajjasani commented on a change in pull request #2130: HBASE-24765: Dynamic master discovery

virajjasani commented on a change in pull request #2130:
URL: https://github.com/apache/hbase/pull/2130#discussion_r460075547



##########
File path: hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
##########
@@ -115,20 +129,50 @@
   MasterRegistry(Configuration conf) throws IOException {
     this.hedgedReadFanOut = Math.max(1, conf.getInt(MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY,
       MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT));
-    int rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
+    rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
       conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
     // XXX: we pass cluster id as null here since we do not have a cluster id yet, we have to fetch
     // this through the master registry...
     // This is a problem as we will use the cluster id to determine the authentication method
     rpcClient = RpcClientFactory.createClient(conf, null);
     rpcControllerFactory = RpcControllerFactory.instantiate(conf);
-    Set<ServerName> masterAddrs = parseMasterAddrs(conf);
+    // Generate the seed list of master stubs. Subsequent RPCs try to keep a live list of masters
+    // by fetching the end points from this list.
+    populateMasterStubs(parseMasterAddrs(conf));
+    Runnable masterEndPointRefresher = () -> {
+      while (!Thread.interrupted()) {
+        try {
+          // Spurious wake ups are okay, worst case we make an extra RPC call to refresh. We won't
+          // have duplicate refreshes because once the thread is past the wait(), notify()s are
+          // ignored until the thread is back to the waiting state.
+          synchronized (refreshMasters) {
+            refreshMasters.wait(WAIT_TIME_OUT_MS);
+          }
+          LOG.debug("Attempting to refresh master address end points.");
+          Set<ServerName> newMasters = new HashSet<>(getMasters().get());
+          populateMasterStubs(newMasters);
+          LOG.debug("Finished refreshing master end points. {}", newMasters);
+        } catch (InterruptedException e) {
+          LOG.debug("Interrupted during wait, aborting refresh-masters-thread.", e);
+          break;
+        } catch (ExecutionException | IOException e) {
+          LOG.debug("Error populating latest list of masters.", e);
+        }
+      }
+    };
+    masterAddrRefresherThread = Threads.newDaemonThreadFactory(
+        "MasterRegistry refresh end-points").newThread(masterEndPointRefresher);

Review comment:
       Can you please use guava library's ThreadFactoryBuilder? So far the consensus on [HBASE-24750](https://issues.apache.org/jira/browse/HBASE-24750) is to get rid of our internally maintained ThreadFactory :)

##########
File path: hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
##########
@@ -115,20 +129,50 @@
   MasterRegistry(Configuration conf) throws IOException {
     this.hedgedReadFanOut = Math.max(1, conf.getInt(MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY,
       MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT));
-    int rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
+    rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
       conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
     // XXX: we pass cluster id as null here since we do not have a cluster id yet, we have to fetch
     // this through the master registry...
     // This is a problem as we will use the cluster id to determine the authentication method
     rpcClient = RpcClientFactory.createClient(conf, null);
     rpcControllerFactory = RpcControllerFactory.instantiate(conf);
-    Set<ServerName> masterAddrs = parseMasterAddrs(conf);
+    // Generate the seed list of master stubs. Subsequent RPCs try to keep a live list of masters
+    // by fetching the end points from this list.
+    populateMasterStubs(parseMasterAddrs(conf));
+    Runnable masterEndPointRefresher = () -> {
+      while (!Thread.interrupted()) {
+        try {
+          // Spurious wake ups are okay, worst case we make an extra RPC call to refresh. We won't
+          // have duplicate refreshes because once the thread is past the wait(), notify()s are
+          // ignored until the thread is back to the waiting state.
+          synchronized (refreshMasters) {
+            refreshMasters.wait(WAIT_TIME_OUT_MS);
+          }
+          LOG.debug("Attempting to refresh master address end points.");
+          Set<ServerName> newMasters = new HashSet<>(getMasters().get());
+          populateMasterStubs(newMasters);
+          LOG.debug("Finished refreshing master end points. {}", newMasters);
+        } catch (InterruptedException e) {
+          LOG.debug("Interrupted during wait, aborting refresh-masters-thread.", e);
+          break;
+        } catch (ExecutionException | IOException e) {
+          LOG.debug("Error populating latest list of masters.", e);
+        }
+      }
+    };
+    masterAddrRefresherThread = Threads.newDaemonThreadFactory(
+        "MasterRegistry refresh end-points").newThread(masterEndPointRefresher);
+    masterAddrRefresherThread.start();

Review comment:
       Don't want to use SingleThreadExecutor.submit()?

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
##########
@@ -2931,6 +2935,27 @@ public GetActiveMasterResponse getActiveMaster(RpcController rpcController,
     return resp.build();
   }
 
+  @Override
+  public GetMastersResponse getMasters(RpcController rpcController, GetMastersRequest request)
+      throws ServiceException {
+    GetMastersResponse.Builder resp = GetMastersResponse.newBuilder();
+    // Active master
+    Optional<ServerName> serverName = master.getActiveMaster();
+    serverName.ifPresent(name -> resp.addMasterServers(GetMastersResponseEntry.newBuilder()
+        .setServerName(ProtobufUtil.toServerName(name)).setIsActive(true).build()));
+    // Backup masters
+    try {
+      // TODO: Cache the backup masters to avoid a ZK RPC for each getMasters() call.

Review comment:
       We are planning to have a cache with ZKWatcher for backupMasters ZNode right? I believe as of now, we don't subscribe for any event.

##########
File path: hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistry.java
##########
@@ -126,4 +131,46 @@ public void testRegistryRPCs() throws Exception {
       }
     }
   }
+
+  /**
+   * Tests that the list of masters configured in the MasterRegistry is dynamically refreshed in the
+   * event of errors.
+   */
+  @Test
+  public void testDynamicMasterConfigurationRefresh() throws Exception {
+    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
+    String currentMasterAddrs = Preconditions.checkNotNull(conf.get(HConstants.MASTER_ADDRS_KEY));
+    HMaster activeMaster = TEST_UTIL.getHBaseCluster().getMaster();
+    String clusterId = activeMaster.getClusterId();
+    // Add a non-working master
+    ServerName badServer = ServerName.valueOf("localhost", 1234, -1);
+    conf.set(HConstants.MASTER_ADDRS_KEY, badServer.toShortString() + "," + currentMasterAddrs);
+    // Set the hedging fan out so that all masters are queried.
+    conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, 4);
+    try (MasterRegistry registry = new MasterRegistry(conf)) {
+      final Set<ServerName> masters = registry.getParsedMasterServers();
+      assertTrue(masters.contains(badServer));
+      // Make a registry RPC, this should trigger a refresh since one of the hedged RPC fails.
+      assertEquals(registry.getClusterId().get(), clusterId);
+      // Wait for new set of masters to be populated.
+      TEST_UTIL.waitFor(5000,
+          (Waiter.Predicate<Exception>) () -> !registry.getParsedMasterServers().equals(masters));
+      // new set of masters should not include the bad server
+      final Set<ServerName> newMasters = registry.getParsedMasterServers();
+      // Bad one should be out.
+      assertEquals(3, newMasters.size());
+      assertFalse(newMasters.contains(badServer));
+      // Kill the active master
+      activeMaster.stopMaster();
+      TEST_UTIL.waitFor(10000,
+        () -> TEST_UTIL.getMiniHBaseCluster().getLiveMasterThreads().size() == 2);
+      assertEquals(registry.getClusterId().get(), clusterId);
+      // Make another registry RPC call which should trigger another refresh.
+      TEST_UTIL.waitFor(100000, (Waiter.Predicate<Exception>) () ->
+          registry.getParsedMasterServers().size() == 2);
+      final Set<ServerName> newMasters2 = registry.getParsedMasterServers();
+      assertEquals(2, newMasters2.size());
+      assertFalse(newMasters2.contains(activeMaster));

Review comment:
       `newMasters2.contains(activeMaster.getServerName())`

##########
File path: hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistry.java
##########
@@ -126,4 +131,46 @@ public void testRegistryRPCs() throws Exception {
       }
     }
   }
+
+  /**
+   * Tests that the list of masters configured in the MasterRegistry is dynamically refreshed in the
+   * event of errors.
+   */
+  @Test
+  public void testDynamicMasterConfigurationRefresh() throws Exception {
+    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
+    String currentMasterAddrs = Preconditions.checkNotNull(conf.get(HConstants.MASTER_ADDRS_KEY));
+    HMaster activeMaster = TEST_UTIL.getHBaseCluster().getMaster();
+    String clusterId = activeMaster.getClusterId();
+    // Add a non-working master
+    ServerName badServer = ServerName.valueOf("localhost", 1234, -1);
+    conf.set(HConstants.MASTER_ADDRS_KEY, badServer.toShortString() + "," + currentMasterAddrs);
+    // Set the hedging fan out so that all masters are queried.
+    conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, 4);
+    try (MasterRegistry registry = new MasterRegistry(conf)) {
+      final Set<ServerName> masters = registry.getParsedMasterServers();
+      assertTrue(masters.contains(badServer));
+      // Make a registry RPC, this should trigger a refresh since one of the hedged RPC fails.
+      assertEquals(registry.getClusterId().get(), clusterId);
+      // Wait for new set of masters to be populated.
+      TEST_UTIL.waitFor(5000,
+          (Waiter.Predicate<Exception>) () -> !registry.getParsedMasterServers().equals(masters));

Review comment:
       nit: upto you if you want to use `ExplainingPredicate` to throw Exception with specific message

##########
File path: hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
##########
@@ -170,6 +214,11 @@ public static String getMasterAddr(Configuration conf) throws UnknownHostExcepti
     callable.call(controller, stub, resp -> {
       if (controller.failed()) {
         future.completeExceptionally(controller.getFailed());
+        // RPC has failed, trigger a refresh of master end points. We can have some spurious
+        // refreshes, but that is okay since the RPC is not expensive and not in a hot path.
+        synchronized (refreshMasters) {
+          refreshMasters.notify();

Review comment:
       For any generic RPC failure, we want to expedite populating masters with another RPC call.
   
   Let's say there are some sequence of events:
   1. getClusterId() RPC call failed
   2. master refresher thread was in `waiting` state, so we notify it and it will trigger getMasters() call
   3. the call fails again and we `notify` refreshMasters but no one is waiting on it, notify is ignored
   4. master refresher thread again waits for 5 min before populating masters.
   
   Do we really want step 4 to wait for 5 min (assuming no other RPC call happens and masters list is stale)? Maybe we can expedite populating masters with the help of AtomicBoolean check (and also avoid `synchronized + wait` calls i.e 5 min wait)?
   
   Even if we have network issue, we don't want to delay populate masters by 5 min right?

##########
File path: hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
##########
@@ -115,20 +129,50 @@
   MasterRegistry(Configuration conf) throws IOException {
     this.hedgedReadFanOut = Math.max(1, conf.getInt(MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY,
       MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT));
-    int rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
+    rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
       conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
     // XXX: we pass cluster id as null here since we do not have a cluster id yet, we have to fetch
     // this through the master registry...
     // This is a problem as we will use the cluster id to determine the authentication method
     rpcClient = RpcClientFactory.createClient(conf, null);
     rpcControllerFactory = RpcControllerFactory.instantiate(conf);
-    Set<ServerName> masterAddrs = parseMasterAddrs(conf);
+    // Generate the seed list of master stubs. Subsequent RPCs try to keep a live list of masters
+    // by fetching the end points from this list.
+    populateMasterStubs(parseMasterAddrs(conf));
+    Runnable masterEndPointRefresher = () -> {
+      while (!Thread.interrupted()) {
+        try {
+          // Spurious wake ups are okay, worst case we make an extra RPC call to refresh. We won't
+          // have duplicate refreshes because once the thread is past the wait(), notify()s are
+          // ignored until the thread is back to the waiting state.
+          synchronized (refreshMasters) {
+            refreshMasters.wait(WAIT_TIME_OUT_MS);
+          }
+          LOG.debug("Attempting to refresh master address end points.");
+          Set<ServerName> newMasters = new HashSet<>(getMasters().get());
+          populateMasterStubs(newMasters);
+          LOG.debug("Finished refreshing master end points. {}", newMasters);
+        } catch (InterruptedException e) {
+          LOG.debug("Interrupted during wait, aborting refresh-masters-thread.", e);
+          break;
+        } catch (ExecutionException | IOException e) {
+          LOG.debug("Error populating latest list of masters.", e);
+        }
+      }
+    };
+    masterAddrRefresherThread = Threads.newDaemonThreadFactory(
+        "MasterRegistry refresh end-points").newThread(masterEndPointRefresher);

Review comment:
       nit: avoid space in Thread prefix name?

##########
File path: hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistry.java
##########
@@ -126,4 +131,46 @@ public void testRegistryRPCs() throws Exception {
       }
     }
   }
+
+  /**
+   * Tests that the list of masters configured in the MasterRegistry is dynamically refreshed in the
+   * event of errors.
+   */
+  @Test
+  public void testDynamicMasterConfigurationRefresh() throws Exception {
+    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
+    String currentMasterAddrs = Preconditions.checkNotNull(conf.get(HConstants.MASTER_ADDRS_KEY));
+    HMaster activeMaster = TEST_UTIL.getHBaseCluster().getMaster();
+    String clusterId = activeMaster.getClusterId();
+    // Add a non-working master
+    ServerName badServer = ServerName.valueOf("localhost", 1234, -1);
+    conf.set(HConstants.MASTER_ADDRS_KEY, badServer.toShortString() + "," + currentMasterAddrs);
+    // Set the hedging fan out so that all masters are queried.
+    conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, 4);
+    try (MasterRegistry registry = new MasterRegistry(conf)) {
+      final Set<ServerName> masters = registry.getParsedMasterServers();
+      assertTrue(masters.contains(badServer));
+      // Make a registry RPC, this should trigger a refresh since one of the hedged RPC fails.
+      assertEquals(registry.getClusterId().get(), clusterId);
+      // Wait for new set of masters to be populated.
+      TEST_UTIL.waitFor(5000,
+          (Waiter.Predicate<Exception>) () -> !registry.getParsedMasterServers().equals(masters));
+      // new set of masters should not include the bad server
+      final Set<ServerName> newMasters = registry.getParsedMasterServers();
+      // Bad one should be out.
+      assertEquals(3, newMasters.size());
+      assertFalse(newMasters.contains(badServer));
+      // Kill the active master
+      activeMaster.stopMaster();
+      TEST_UTIL.waitFor(10000,
+        () -> TEST_UTIL.getMiniHBaseCluster().getLiveMasterThreads().size() == 2);
+      assertEquals(registry.getClusterId().get(), clusterId);
+      // Make another registry RPC call which should trigger another refresh.
+      TEST_UTIL.waitFor(100000, (Waiter.Predicate<Exception>) () ->
+          registry.getParsedMasterServers().size() == 2);
+      final Set<ServerName> newMasters2 = registry.getParsedMasterServers();
+      assertEquals(2, newMasters2.size());

Review comment:
       After stopping activeMaster, maybe add an extra check to confirm list contains one Active and one Backup?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org