You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zg...@apache.org on 2020/08/28 08:53:10 UTC

[hbase] branch master updated: HBASE-24760 Add a config hbase.rsgroup.fallback.enable for RSGroup fallback feature (#2149)

This is an automated email from the ASF dual-hosted git repository.

zghao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new 7909e29  HBASE-24760 Add a config hbase.rsgroup.fallback.enable for RSGroup fallback feature (#2149)
7909e29 is described below

commit 7909e29de5c5b54cbcc1bda60c3345333de3df81
Author: XinSun <dd...@gmail.com>
AuthorDate: Fri Aug 28 16:52:45 2020 +0800

    HBASE-24760 Add a config hbase.rsgroup.fallback.enable for RSGroup fallback feature (#2149)
    
    Signed-off-by: Guanghao Zhang <zg...@apache.org>
---
 .../hbase/rsgroup/RSGroupBasedLoadBalancer.java    | 135 +++++++++------------
 .../hadoop/hbase/rsgroup/TestRSGroupsBase.java     |   6 +
 .../hadoop/hbase/rsgroup/TestRSGroupsFallback.java |  69 ++++++-----
 3 files changed, 101 insertions(+), 109 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupBasedLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupBasedLoadBalancer.java
index 7e81f75..50ddb41 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupBasedLoadBalancer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupBasedLoadBalancer.java
@@ -19,10 +19,8 @@ package org.apache.hadoop.hbase.rsgroup;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -51,7 +49,6 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap;
-import org.apache.hbase.thirdparty.com.google.common.collect.LinkedListMultimap;
 import org.apache.hbase.thirdparty.com.google.common.collect.ListMultimap;
 import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
 import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
@@ -81,15 +78,15 @@ public class RSGroupBasedLoadBalancer implements LoadBalancer {
   private LoadBalancer internalBalancer;
 
   /**
-   * Define the config key of fallback groups
-   * Enabled only if this property is set
+   * Set this key to {@code true} to allow region fallback.
+   * Fallback to the default rsgroup first, then fallback to any group if no online servers in
+   * default rsgroup.
    * Please keep balancer switch on at the same time, which is relied on to correct misplaced
    * regions
    */
-  public static final String FALLBACK_GROUPS_KEY = "hbase.rsgroup.fallback.groups";
+  public static final String FALLBACK_GROUP_ENABLE_KEY = "hbase.rsgroup.fallback.enable";
 
   private boolean fallbackEnabled = false;
-  private Set<String> fallbackGroups;
 
   /**
    * Used by reflection in {@link org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory}.
@@ -180,22 +177,14 @@ public class RSGroupBasedLoadBalancer implements LoadBalancer {
   public Map<ServerName, List<RegionInfo>> roundRobinAssignment(
       List<RegionInfo> regions, List<ServerName> servers) throws IOException {
     Map<ServerName, List<RegionInfo>> assignments = Maps.newHashMap();
-    ListMultimap<String, RegionInfo> regionMap = ArrayListMultimap.create();
-    ListMultimap<String, ServerName> serverMap = ArrayListMultimap.create();
-    generateGroupMaps(regions, servers, regionMap, serverMap);
-    for (String groupKey : regionMap.keySet()) {
-      if (regionMap.get(groupKey).size() > 0) {
-        Map<ServerName, List<RegionInfo>> result = this.internalBalancer
-          .roundRobinAssignment(regionMap.get(groupKey), serverMap.get(groupKey));
-        if (result != null) {
-          if (result.containsKey(LoadBalancer.BOGUS_SERVER_NAME) &&
-            assignments.containsKey(LoadBalancer.BOGUS_SERVER_NAME)) {
-            assignments.get(LoadBalancer.BOGUS_SERVER_NAME)
-              .addAll(result.get(LoadBalancer.BOGUS_SERVER_NAME));
-          } else {
-            assignments.putAll(result);
-          }
-        }
+    List<Pair<List<RegionInfo>, List<ServerName>>> pairs =
+        generateGroupAssignments(regions, servers);
+    for (Pair<List<RegionInfo>, List<ServerName>> pair : pairs) {
+      Map<ServerName, List<RegionInfo>> result = this.internalBalancer
+          .roundRobinAssignment(pair.getFirst(), pair.getSecond());
+      if (result != null) {
+        result.forEach((server, regionInfos) ->
+            assignments.computeIfAbsent(server, s -> Lists.newArrayList()).addAll(regionInfos));
       }
     }
     return assignments;
@@ -206,36 +195,16 @@ public class RSGroupBasedLoadBalancer implements LoadBalancer {
     List<ServerName> servers) throws HBaseIOException {
     try {
       Map<ServerName, List<RegionInfo>> assignments = new TreeMap<>();
-      ListMultimap<String, RegionInfo> groupToRegion = ArrayListMultimap.create();
-      RSGroupInfo defaultInfo = rsGroupInfoManager.getRSGroup(RSGroupInfo.DEFAULT_GROUP);
-      for (RegionInfo region : regions.keySet()) {
-        String groupName =
-          RSGroupUtil.getRSGroupInfo(masterServices, rsGroupInfoManager, region.getTable())
-              .orElse(defaultInfo).getName();
-        groupToRegion.put(groupName, region);
-      }
-      for (String group : groupToRegion.keySet()) {
-        Map<RegionInfo, ServerName> currentAssignmentMap = new TreeMap<RegionInfo, ServerName>();
-        List<RegionInfo> regionList = groupToRegion.get(group);
-        RSGroupInfo info = rsGroupInfoManager.getRSGroup(group);
-        List<ServerName> candidateList = filterOfflineServers(info, servers);
-        if (fallbackEnabled && candidateList.isEmpty()) {
-          candidateList = getFallBackCandidates(servers);
-        }
-        for (RegionInfo region : regionList) {
-          currentAssignmentMap.put(region, regions.get(region));
-        }
-        if (candidateList.size() > 0) {
-          assignments
-            .putAll(this.internalBalancer.retainAssignment(currentAssignmentMap, candidateList));
-        } else {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("No available servers for group {} to assign regions: {}", group,
-                RegionInfo.getShortNameToLog(regionList));
-          }
-          assignments.computeIfAbsent(LoadBalancer.BOGUS_SERVER_NAME, s -> new ArrayList<>())
-            .addAll(regionList);
-        }
+      List<Pair<List<RegionInfo>, List<ServerName>>> pairs =
+          generateGroupAssignments(Lists.newArrayList(regions.keySet()), servers);
+      for (Pair<List<RegionInfo>, List<ServerName>> pair : pairs) {
+        List<RegionInfo> regionList = pair.getFirst();
+        Map<RegionInfo, ServerName> currentAssignmentMap = Maps.newTreeMap();
+        regionList.forEach(r -> currentAssignmentMap.put(r, regions.get(r)));
+        Map<ServerName, List<RegionInfo>> pairResult =
+            this.internalBalancer.retainAssignment(currentAssignmentMap, pair.getSecond());
+        pairResult.forEach((server, rs) ->
+            assignments.computeIfAbsent(server, s -> Lists.newArrayList()).addAll(rs));
       }
       return assignments;
     } catch (IOException e) {
@@ -246,17 +215,17 @@ public class RSGroupBasedLoadBalancer implements LoadBalancer {
   @Override
   public ServerName randomAssignment(RegionInfo region,
       List<ServerName> servers) throws IOException {
-    ListMultimap<String,RegionInfo> regionMap = LinkedListMultimap.create();
-    ListMultimap<String,ServerName> serverMap = LinkedListMultimap.create();
-    generateGroupMaps(Lists.newArrayList(region), servers, regionMap, serverMap);
-    List<ServerName> filteredServers = serverMap.get(regionMap.keySet().iterator().next());
+    List<Pair<List<RegionInfo>, List<ServerName>>> pairs =
+        generateGroupAssignments(Lists.newArrayList(region), servers);
+    List<ServerName> filteredServers = pairs.iterator().next().getSecond();
     return this.internalBalancer.randomAssignment(region, filteredServers);
   }
 
-  private void generateGroupMaps(List<RegionInfo> regions, List<ServerName> servers,
-    ListMultimap<String, RegionInfo> regionMap, ListMultimap<String, ServerName> serverMap)
-    throws HBaseIOException {
+  private List<Pair<List<RegionInfo>, List<ServerName>>> generateGroupAssignments(
+      List<RegionInfo> regions, List<ServerName> servers) throws HBaseIOException {
     try {
+      ListMultimap<String, RegionInfo> regionMap = ArrayListMultimap.create();
+      ListMultimap<String, ServerName> serverMap = ArrayListMultimap.create();
       RSGroupInfo defaultInfo = rsGroupInfoManager.getRSGroup(RSGroupInfo.DEFAULT_GROUP);
       for (RegionInfo region : regions) {
         String groupName =
@@ -267,15 +236,29 @@ public class RSGroupBasedLoadBalancer implements LoadBalancer {
       for (String groupKey : regionMap.keySet()) {
         RSGroupInfo info = rsGroupInfoManager.getRSGroup(groupKey);
         serverMap.putAll(groupKey, filterOfflineServers(info, servers));
-        if (fallbackEnabled && serverMap.get(groupKey).isEmpty()) {
-          serverMap.putAll(groupKey, getFallBackCandidates(servers));
-        }
+      }
+
+      List<Pair<List<RegionInfo>, List<ServerName>>> result = Lists.newArrayList();
+      List<RegionInfo> fallbackRegions = Lists.newArrayList();
+      for (String groupKey : regionMap.keySet()) {
         if (serverMap.get(groupKey).isEmpty()) {
-          serverMap.put(groupKey, LoadBalancer.BOGUS_SERVER_NAME);
+          fallbackRegions.addAll(regionMap.get(groupKey));
+        } else {
+          result.add(Pair.newPair(regionMap.get(groupKey), serverMap.get(groupKey)));
+        }
+      }
+      if (!fallbackRegions.isEmpty()) {
+        List<ServerName> candidates = null;
+        if (fallbackEnabled) {
+          candidates = getFallBackCandidates(servers);
         }
+        candidates = (candidates == null || candidates.isEmpty()) ?
+          Lists.newArrayList(BOGUS_SERVER_NAME) : candidates;
+        result.add(Pair.newPair(fallbackRegions, candidates));
       }
+      return result;
     } catch(IOException e) {
-      throw new HBaseIOException("Failed to generate group maps", e);
+      throw new HBaseIOException("Failed to generate group assignments", e);
     }
   }
 
@@ -390,11 +373,7 @@ public class RSGroupBasedLoadBalancer implements LoadBalancer {
     }
     internalBalancer.initialize();
     // init fallback groups
-    Collection<String> groups = config.getTrimmedStringCollection(FALLBACK_GROUPS_KEY);
-    if (groups != null && !groups.isEmpty()) {
-      this.fallbackEnabled = true;
-      this.fallbackGroups = new HashSet<>(groups);
-    }
+    this.fallbackEnabled = config.getBoolean(FALLBACK_GROUP_ENABLE_KEY, false);
   }
 
   public boolean isOnline() {
@@ -485,15 +464,13 @@ public class RSGroupBasedLoadBalancer implements LoadBalancer {
   }
 
   private List<ServerName> getFallBackCandidates(List<ServerName> servers) {
-    List<ServerName> serverNames = new ArrayList<>();
-    for (String fallbackGroup : fallbackGroups) {
-      try {
-        RSGroupInfo info = rsGroupInfoManager.getRSGroup(fallbackGroup);
-        serverNames.addAll(filterOfflineServers(info, servers));
-      } catch (IOException e) {
-        LOG.error("Get group info for {} failed", fallbackGroup, e);
-      }
+    List<ServerName> serverNames = null;
+    try {
+      RSGroupInfo info = rsGroupInfoManager.getRSGroup(RSGroupInfo.DEFAULT_GROUP);
+      serverNames = filterOfflineServers(info, servers);
+    } catch (IOException e) {
+      LOG.error("Failed to get default rsgroup info to fallback", e);
     }
-    return serverNames;
+    return serverNames == null || serverNames.isEmpty() ? servers : serverNames;
   }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsBase.java
index e5cd3a1..91b9262 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsBase.java
@@ -206,6 +206,12 @@ public abstract class TestRSGroupsBase {
       }
     }
     ADMIN.setRSGroup(tables, RSGroupInfo.DEFAULT_GROUP);
+    for (NamespaceDescriptor nd : ADMIN.listNamespaceDescriptors()) {
+      if (groupName.equals(nd.getConfigurationValue(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP))) {
+        nd.removeConfiguration(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP);
+        ADMIN.modifyNamespace(nd);
+      }
+    }
     RSGroupInfo groupInfo = ADMIN.getRSGroup(groupName);
     ADMIN.moveServersToRSGroup(groupInfo.getServers(), RSGroupInfo.DEFAULT_GROUP);
     ADMIN.removeRSGroup(groupName);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsFallback.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsFallback.java
index 2d05f55..ea5e226 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsFallback.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsFallback.java
@@ -24,6 +24,8 @@ import java.util.Collections;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
@@ -32,6 +34,7 @@ import org.apache.hadoop.hbase.net.Address;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.RSGroupTests;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.JVMClusterUtil;
 import org.apache.hadoop.hbase.util.Threads;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -56,8 +59,8 @@ public class TestRSGroupsFallback extends TestRSGroupsBase {
 
   @BeforeClass
   public static void setUp() throws Exception {
-    Configuration configuration = TEST_UTIL.getConfiguration();
-    configuration.set(RSGroupBasedLoadBalancer.FALLBACK_GROUPS_KEY, FALLBACK_GROUP);
+    Configuration conf = TEST_UTIL.getConfiguration();
+    conf.setBoolean(RSGroupBasedLoadBalancer.FALLBACK_GROUP_ENABLE_KEY, true);
     setUpTestBeforeClass();
     MASTER.balanceSwitch(true);
   }
@@ -78,51 +81,57 @@ public class TestRSGroupsFallback extends TestRSGroupsBase {
   }
 
   @Test
-  public void testGroupFallback() throws Exception {
+  public void testFallback() throws Exception {
     // add fallback group
     addGroup(FALLBACK_GROUP, 1);
     // add test group
     String groupName = getGroupName(name.getMethodName());
     addGroup(groupName, 1);
     TableDescriptor desc = TableDescriptorBuilder.newBuilder(tableName)
-        .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("f")).build())
-        .setRegionServerGroup(groupName)
-        .build();
-    ADMIN.createTable(desc);
+      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("f")).build())
+      .setRegionServerGroup(groupName)
+      .build();
+    ADMIN.createTable(desc, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
     TEST_UTIL.waitUntilAllRegionsAssigned(tableName);
-    // server of test group crash
-    for (Address server : ADMIN.getRSGroup(groupName).getServers()) {
-      AssignmentTestingUtil.crashRs(TEST_UTIL, getServerName(server), true);
-    }
-    Threads.sleep(1000);
-    TEST_UTIL.waitUntilNoRegionsInTransition(10000);
-    TEST_UTIL.waitUntilAllRegionsAssigned(tableName);
-
-    // regions move to fallback group
-    assertRegionsInGroup(FALLBACK_GROUP);
+    // server of test group crash, regions move to default group
+    crashRsInGroup(groupName);
+    assertRegionsInGroup(tableName, RSGroupInfo.DEFAULT_GROUP);
 
-    // move a new server from default group
-    Address address = ADMIN.getRSGroup(RSGroupInfo.DEFAULT_GROUP).getServers().first();
-    ADMIN.moveServersToRSGroup(Collections.singleton(address), groupName);
+    // server of default group crash, regions move to any other group
+    crashRsInGroup(RSGroupInfo.DEFAULT_GROUP);
+    assertRegionsInGroup(tableName, FALLBACK_GROUP);
 
-    // correct misplaced regions
+    // add a new server to default group, regions move to default group
+    TEST_UTIL.getMiniHBaseCluster().startRegionServerAndWait(60000);
     MASTER.balance();
+    assertRegionsInGroup(tableName, RSGroupInfo.DEFAULT_GROUP);
 
-    TEST_UTIL.waitUntilNoRegionsInTransition(10000);
-    TEST_UTIL.waitUntilAllRegionsAssigned(tableName);
-
-    // regions move back
-    assertRegionsInGroup(groupName);
+    // add a new server to test group, regions move back
+    JVMClusterUtil.RegionServerThread t =
+      TEST_UTIL.getMiniHBaseCluster().startRegionServerAndWait(60000);
+    ADMIN.moveServersToRSGroup(
+      Collections.singleton(t.getRegionServer().getServerName().getAddress()), groupName);
+    MASTER.balance();
+    assertRegionsInGroup(tableName, groupName);
 
     TEST_UTIL.deleteTable(tableName);
   }
 
-  private void assertRegionsInGroup(String group) throws IOException {
-    RSGroupInfo fallbackGroup = ADMIN.getRSGroup(group);
-    MASTER.getAssignmentManager().getRegionStates().getRegionsOfTable(tableName).forEach(region -> {
+  private void assertRegionsInGroup(TableName table, String group) throws IOException {
+    TEST_UTIL.waitUntilAllRegionsAssigned(table);
+    RSGroupInfo rsGroup = ADMIN.getRSGroup(group);
+    MASTER.getAssignmentManager().getRegionStates().getRegionsOfTable(table).forEach(region -> {
       Address regionOnServer = MASTER.getAssignmentManager().getRegionStates()
           .getRegionAssignments().get(region).getAddress();
-      assertTrue(fallbackGroup.getServers().contains(regionOnServer));
+      assertTrue(rsGroup.getServers().contains(regionOnServer));
     });
   }
+
+  private void crashRsInGroup(String groupName) throws Exception {
+    for (Address server : ADMIN.getRSGroup(groupName).getServers()) {
+      AssignmentTestingUtil.crashRs(TEST_UTIL, getServerName(server), true);
+    }
+    Threads.sleep(1000);
+    TEST_UTIL.waitUntilNoRegionsInTransition(60000);
+  }
 }