You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by vj...@apache.org on 2021/08/17 13:56:14 UTC
[hbase] branch branch-1 updated: HBASE-25849 Backport HBASE-22738,
HBASE-24760 & HBASE-25298 to branch-1 (#3581)
This is an automated email from the ASF dual-hosted git repository.
vjasani pushed a commit to branch branch-1
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-1 by this push:
new ecac266 HBASE-25849 Backport HBASE-22738, HBASE-24760 & HBASE-25298 to branch-1 (#3581)
ecac266 is described below
commit ecac2666331b5e48a6ba8ec0bf251236fffb7359
Author: caroliney14 <ca...@berkeley.edu>
AuthorDate: Tue Aug 17 06:55:47 2021 -0700
HBASE-25849 Backport HBASE-22738, HBASE-24760 & HBASE-25298 to branch-1 (#3581)
Signed-off-by: Andrew Purtell <ap...@apache.org>
Signed-off-by: Bharath Vissapragada <bh...@apache.org>
Signed-off-by: Viraj Jasani <vj...@apache.org>
---
.../hbase/rsgroup/RSGroupBasedLoadBalancer.java | 188 +++++++++++---------
.../hbase/rsgroup/RSGroupInfoManagerImpl.java | 5 +-
.../balancer/TestRSGroupBasedLoadBalancer.java | 17 ++
.../hadoop/hbase/rsgroup/TestRSGroupsBase.java | 17 ++
.../hadoop/hbase/rsgroup/TestRSGroupsFallback.java | 194 +++++++++++++++++++++
.../org/apache/hadoop/hbase/MiniHBaseCluster.java | 29 +++
6 files changed, 369 insertions(+), 81 deletions(-)
diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupBasedLoadBalancer.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupBasedLoadBalancer.java
index a408833..23c75cc 100644
--- a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupBasedLoadBalancer.java
+++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupBasedLoadBalancer.java
@@ -21,7 +21,6 @@
package org.apache.hadoop.hbase.rsgroup;
import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.LinkedListMultimap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -52,6 +51,7 @@ import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer;
import org.apache.hadoop.hbase.net.Address;
+import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.util.ReflectionUtils;
/**
@@ -81,6 +81,17 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer, LoadBalanc
private RSGroupInfoManager infoManager;
private LoadBalancer internalBalancer;
+ /**
+ * Set this key to {@code true} to allow region fallback.
+ * Fallback to the default rsgroup first, then fallback to any group if no online servers in
+ * default rsgroup.
+ * Please keep balancer switch on at the same time, which is relied on to correct misplaced
+ * regions
+ */
+ public static final String FALLBACK_GROUP_ENABLE_KEY = "hbase.rsgroup.fallback.enable";
+
+ private boolean fallbackEnabled = false;
+
//used during reflection by LoadBalancerFactory
@InterfaceAudience.Private
public RSGroupBasedLoadBalancer() {
@@ -133,11 +144,16 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer, LoadBalanc
}
Map<ServerName,List<HRegionInfo>> correctedState = correctAssignments(clusterState);
- List<RegionPlan> regionPlans = new ArrayList<RegionPlan>();
+ List<RegionPlan> regionPlans = new ArrayList<>();
List<HRegionInfo> misplacedRegions = correctedState.get(LoadBalancer.BOGUS_SERVER_NAME);
for (HRegionInfo regionInfo : misplacedRegions) {
- regionPlans.add(new RegionPlan(regionInfo, null, null));
+ if (fallbackEnabled) {
+ regionPlans.add(new RegionPlan(regionInfo, findServerForRegion(clusterState, regionInfo),
+ null));
+ } else {
+ regionPlans.add(new RegionPlan(regionInfo, null, null));
+ }
}
try {
// Record which region servers have been processed,so as to skip them after processed
@@ -172,23 +188,19 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer, LoadBalanc
public Map<ServerName, List<HRegionInfo>> roundRobinAssignment(
List<HRegionInfo> regions, List<ServerName> servers) throws HBaseIOException {
Map<ServerName, List<HRegionInfo>> assignments = Maps.newHashMap();
- ListMultimap<String,HRegionInfo> regionMap = ArrayListMultimap.create();
- ListMultimap<String,ServerName> serverMap = ArrayListMultimap.create();
- generateGroupMaps(regions, servers, regionMap, serverMap);
- for(String groupKey : regionMap.keySet()) {
- if (regionMap.get(groupKey).size() > 0) {
- Map<ServerName, List<HRegionInfo>> result =
- this.internalBalancer.roundRobinAssignment(
- regionMap.get(groupKey),
- serverMap.get(groupKey));
- if(result != null) {
- if(result.containsKey(LoadBalancer.BOGUS_SERVER_NAME) &&
- assignments.containsKey(LoadBalancer.BOGUS_SERVER_NAME)){
- assignments.get(LoadBalancer.BOGUS_SERVER_NAME).addAll(
- result.get(LoadBalancer.BOGUS_SERVER_NAME));
- } else {
- assignments.putAll(result);
+ List<Pair<List<HRegionInfo>, List<ServerName>>> pairs =
+ generateGroupAssignments(regions, servers);
+ for (Pair<List<HRegionInfo>, List<ServerName>> pair : pairs) {
+ Map<ServerName, List<HRegionInfo>> result = this.internalBalancer
+ .roundRobinAssignment(pair.getFirst(), pair.getSecond());
+ if (result != null) {
+ for (Map.Entry<ServerName, List<HRegionInfo>> entry : result.entrySet()) {
+ ServerName serverName = entry.getKey();
+ List<HRegionInfo> regionInfos = entry.getValue();
+ if (!assignments.containsKey(serverName)) {
+ assignments.put(serverName, Lists.<HRegionInfo>newArrayList());
}
+ assignments.get(serverName).addAll(regionInfos);
}
}
}
@@ -199,56 +211,24 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer, LoadBalanc
public Map<ServerName, List<HRegionInfo>> retainAssignment(
Map<HRegionInfo, ServerName> regions, List<ServerName> servers) throws HBaseIOException {
try {
- Map<ServerName, List<HRegionInfo>> assignments = new TreeMap<ServerName, List<HRegionInfo>>();
- ListMultimap<String, HRegionInfo> groupToRegion = ArrayListMultimap.create();
- Set<HRegionInfo> misplacedRegions = getMisplacedRegions(regions);
- for (HRegionInfo region : regions.keySet()) {
- if (!misplacedRegions.contains(region)) {
- String groupName = infoManager.getRSGroupOfTable(region.getTable());
- if (groupName == null) {
- LOG.debug("Group not found for table " + region.getTable() + ", using default");
- groupName = RSGroupInfo.DEFAULT_GROUP;
- }
- groupToRegion.put(groupName, region);
+ Map<ServerName, List<HRegionInfo>> assignments = new TreeMap<>();
+ List<Pair<List<HRegionInfo>, List<ServerName>>> pairs =
+ generateGroupAssignments(Lists.newArrayList(regions.keySet()), servers);
+ for (Pair<List<HRegionInfo>, List<ServerName>> pair : pairs) {
+ List<HRegionInfo> regionList = pair.getFirst();
+ Map<HRegionInfo, ServerName> currentAssignmentMap = Maps.newTreeMap();
+ for (HRegionInfo regionInfo: regionList) {
+ currentAssignmentMap.put(regionInfo, regions.get(regionInfo));
}
- }
- // Now the "groupToRegion" map has only the regions which have correct
- // assignments.
- for (String key : groupToRegion.keySet()) {
- Map<HRegionInfo, ServerName> currentAssignmentMap = new TreeMap<HRegionInfo, ServerName>();
- List<HRegionInfo> regionList = groupToRegion.get(key);
- RSGroupInfo info = infoManager.getRSGroup(key);
- List<ServerName> candidateList = filterOfflineServers(info, servers);
- for (HRegionInfo region : regionList) {
- currentAssignmentMap.put(region, regions.get(region));
- }
- if(candidateList.size() > 0) {
- assignments.putAll(this.internalBalancer.retainAssignment(
- currentAssignmentMap, candidateList));
- }
- }
-
- for (HRegionInfo region : misplacedRegions) {
- String groupName = infoManager.getRSGroupOfTable(region.getTable());
- if (groupName == null) {
- LOG.debug("Group not found for table " + region.getTable() + ", using default");
- groupName = RSGroupInfo.DEFAULT_GROUP;
- }
- RSGroupInfo info = infoManager.getRSGroup(groupName);
- List<ServerName> candidateList = filterOfflineServers(info, servers);
- ServerName server = this.internalBalancer.randomAssignment(region,
- candidateList);
- if (server != null) {
- if (!assignments.containsKey(server)) {
- assignments.put(server, new ArrayList<HRegionInfo>());
+ Map<ServerName, List<HRegionInfo>> pairResult =
+ this.internalBalancer.retainAssignment(currentAssignmentMap, pair.getSecond());
+ for (Map.Entry<ServerName, List<HRegionInfo>> entry : pairResult.entrySet()) {
+ ServerName serverName = entry.getKey();
+ List<HRegionInfo> regionInfos = entry.getValue();
+ if (!assignments.containsKey(serverName)) {
+ assignments.put(serverName, Lists.<HRegionInfo>newArrayList());
}
- assignments.get(server).add(region);
- } else {
- //if not server is available assign to bogus so it ends up in RIT
- if(!assignments.containsKey(LoadBalancer.BOGUS_SERVER_NAME)) {
- assignments.put(LoadBalancer.BOGUS_SERVER_NAME, new ArrayList<HRegionInfo>());
- }
- assignments.get(LoadBalancer.BOGUS_SERVER_NAME).add(region);
+ assignments.get(serverName).addAll(regionInfos);
}
}
return assignments;
@@ -266,19 +246,17 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer, LoadBalanc
@Override
public ServerName randomAssignment(HRegionInfo region,
List<ServerName> servers) throws HBaseIOException {
- ListMultimap<String,HRegionInfo> regionMap = LinkedListMultimap.create();
- ListMultimap<String,ServerName> serverMap = LinkedListMultimap.create();
- generateGroupMaps(Lists.newArrayList(region), servers, regionMap, serverMap);
- List<ServerName> filteredServers = serverMap.get(regionMap.keySet().iterator().next());
+ List<Pair<List<HRegionInfo>, List<ServerName>>> pairs =
+ generateGroupAssignments(Lists.newArrayList(region), servers);
+ List<ServerName> filteredServers = pairs.iterator().next().getSecond();
return this.internalBalancer.randomAssignment(region, filteredServers);
}
- private void generateGroupMaps(
- List<HRegionInfo> regions,
- List<ServerName> servers,
- ListMultimap<String, HRegionInfo> regionMap,
- ListMultimap<String, ServerName> serverMap) throws HBaseIOException {
+ private List<Pair<List<HRegionInfo>, List<ServerName>>> generateGroupAssignments(
+ List<HRegionInfo> regions, List<ServerName> servers) throws HBaseIOException {
try {
+ ListMultimap<String, HRegionInfo> regionMap = ArrayListMultimap.create();
+ ListMultimap<String, ServerName> serverMap = ArrayListMultimap.create();
for (HRegionInfo region : regions) {
String groupName = infoManager.getRSGroupOfTable(region.getTable());
if (groupName == null) {
@@ -290,12 +268,29 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer, LoadBalanc
for (String groupKey : regionMap.keySet()) {
RSGroupInfo info = infoManager.getRSGroup(groupKey);
serverMap.putAll(groupKey, filterOfflineServers(info, servers));
- if(serverMap.get(groupKey).size() < 1) {
- serverMap.put(groupKey, LoadBalancer.BOGUS_SERVER_NAME);
+ }
+
+ List<Pair<List<HRegionInfo>, List<ServerName>>> result = Lists.newArrayList();
+ List<HRegionInfo> fallbackRegions = Lists.newArrayList();
+ for (String groupKey : regionMap.keySet()) {
+ if (serverMap.get(groupKey).isEmpty()) {
+ fallbackRegions.addAll(regionMap.get(groupKey));
+ } else {
+ result.add(Pair.newPair(regionMap.get(groupKey), serverMap.get(groupKey)));
}
}
+ if (!fallbackRegions.isEmpty()) {
+ List<ServerName> candidates = null;
+ if (isFallbackEnabled()) {
+ candidates = getFallBackCandidates(servers);
+ }
+ candidates = (candidates == null || candidates.isEmpty()) ?
+ Lists.newArrayList(BOGUS_SERVER_NAME) : candidates;
+ result.add(Pair.newPair(fallbackRegions, candidates));
+ }
+ return result;
} catch(IOException e) {
- throw new HBaseIOException("Failed to generate group maps", e);
+ throw new HBaseIOException("Failed to generate group assignments", e);
}
}
@@ -368,6 +363,18 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer, LoadBalanc
return misplacedRegions;
}
+ private ServerName findServerForRegion(
+ Map<ServerName, List<HRegionInfo>> existingAssignments, HRegionInfo region) {
+ for (Map.Entry<ServerName, List<HRegionInfo>> entry : existingAssignments.entrySet()) {
+ if (entry.getValue().contains(region)) {
+ return entry.getKey();
+ }
+ }
+
+ throw new IllegalStateException("Could not find server for region "
+ + region.getShortNameToLog());
+ }
+
private Map<ServerName, List<HRegionInfo>> correctAssignments(
Map<ServerName, List<HRegionInfo>> existingAssignments) {
Map<ServerName, List<HRegionInfo>> correctAssignments =
@@ -434,12 +441,18 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer, LoadBalanc
internalBalancer.setMasterServices(masterServices);
internalBalancer.setConf(config);
internalBalancer.initialize();
+ // init fallback groups
+ this.fallbackEnabled = config.getBoolean(FALLBACK_GROUP_ENABLE_KEY, false);
}
public boolean isOnline() {
return infoManager != null && infoManager.isOnline();
}
+ public boolean isFallbackEnabled() {
+ return fallbackEnabled;
+ }
+
@Override
public void regionOnline(HRegionInfo regionInfo, ServerName sn) {
}
@@ -450,6 +463,12 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer, LoadBalanc
@Override
public void onConfigurationChange(Configuration conf) {
+ boolean newFallbackEnabled = conf.getBoolean(FALLBACK_GROUP_ENABLE_KEY, false);
+ if (fallbackEnabled != newFallbackEnabled) {
+ LOG.info("Changing the value of " + FALLBACK_GROUP_ENABLE_KEY + " from " + fallbackEnabled
+ + " to " + newFallbackEnabled);
+ fallbackEnabled = newFallbackEnabled;
+ }
internalBalancer.onConfigurationChange(conf);
}
@@ -470,4 +489,15 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer, LoadBalanc
public void updateBalancerStatus(boolean status) {
internalBalancer.updateBalancerStatus(status);
}
+
+ private List<ServerName> getFallBackCandidates(List<ServerName> servers) {
+ List<ServerName> serverNames = null;
+ try {
+ RSGroupInfo info = infoManager.getRSGroup(RSGroupInfo.DEFAULT_GROUP);
+ serverNames = filterOfflineServers(info, servers);
+ } catch (IOException e) {
+ LOG.error("Failed to get default rsgroup info to fallback", e);
+ }
+ return serverNames == null || serverNames.isEmpty() ? servers : serverNames;
+ }
}
diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java
index 6799e69..ce108a2 100644
--- a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java
+++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java
@@ -503,12 +503,13 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene
}
private synchronized void updateDefaultServers(
- Set<Address> server) throws IOException {
+ Set<Address> server) {
RSGroupInfo info = rsGroupMap.get(RSGroupInfo.DEFAULT_GROUP);
RSGroupInfo newInfo = new RSGroupInfo(info.getName(), server, info.getTables());
HashMap<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
newGroupMap.put(newInfo.getName(), newInfo);
- flushConfig(newGroupMap);
+ // do not need to persist, as we do not persist default group.
+ rsGroupMap = Collections.unmodifiableMap(newGroupMap);
}
@Override
diff --git a/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancer.java b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancer.java
index 6170cc1..6cbdc88 100644
--- a/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancer.java
+++ b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancer.java
@@ -186,4 +186,21 @@ public class TestRSGroupBasedLoadBalancer extends RSGroupableBalancerTestBase {
.roundRobinAssignment(regions, onlineServers);
assertEquals(bogusRegion, assignments.get(LoadBalancer.BOGUS_SERVER_NAME).size());
}
+
+ @Test
+ public void testOnConfigurationChange() {
+ // fallbackEnabled default is false
+ assertFalse(loadBalancer.isFallbackEnabled());
+
+ // change FALLBACK_GROUP_ENABLE_KEY from false to true
+ Configuration conf = loadBalancer.getConf();
+ conf.setBoolean(RSGroupBasedLoadBalancer.FALLBACK_GROUP_ENABLE_KEY, true);
+ loadBalancer.onConfigurationChange(conf);
+ assertTrue(loadBalancer.isFallbackEnabled());
+
+ // restore
+ conf.setBoolean(RSGroupBasedLoadBalancer.FALLBACK_GROUP_ENABLE_KEY, false);
+ loadBalancer.onConfigurationChange(conf);
+ assertFalse(loadBalancer.isFallbackEnabled());
+ }
}
diff --git a/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsBase.java b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsBase.java
index 3d19df0..1b30c4f 100644
--- a/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsBase.java
+++ b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsBase.java
@@ -56,6 +56,7 @@ import org.apache.hadoop.hbase.master.ServerManager;
import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
import org.apache.hadoop.hbase.net.Address;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.JVMClusterUtil;
public abstract class TestRSGroupsBase {
protected static final Log LOG = LogFactory.getLog(TestRSGroupsBase.class);
@@ -102,6 +103,7 @@ public abstract class TestRSGroupsBase {
admin = TEST_UTIL.getHBaseAdmin();
cluster = TEST_UTIL.getHBaseCluster();
master = ((MiniHBaseCluster)cluster).getMaster();
+ master.balanceSwitch(true);
//wait for balancer to come online
TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate<Exception>() {
@@ -277,6 +279,21 @@ public abstract class TestRSGroupsBase {
return groupPrefix+"_"+baseName+"_"+rand.nextInt(Integer.MAX_VALUE);
}
+ /**
+ * The server name in group does not contain the start code, this method will find out the start
+ * code and construct the ServerName object.
+ */
+ protected final ServerName getServerName(Address addr) {
+ for (JVMClusterUtil.RegionServerThread rsThread:
+ TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads()) {
+ ServerName sn = rsThread.getRegionServer().getServerName();
+ if (sn.getAddress().equals(addr)) {
+ return sn;
+ }
+ }
+ return null;
+ }
+
public static class CPMasterObserver extends BaseMasterObserver {
boolean preBalanceRSGroupCalled = false;
boolean postBalanceRSGroupCalled = false;
diff --git a/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsFallback.java b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsFallback.java
new file mode 100644
index 0000000..2d4d72f
--- /dev/null
+++ b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsFallback.java
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.rsgroup;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Collections;
+
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.NamespaceDescriptor;
+import org.apache.hadoop.hbase.ProcedureInfo;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.master.ServerManager;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
+import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
+import org.apache.hadoop.hbase.net.Address;
+import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
+import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.JVMClusterUtil;
+import org.apache.hadoop.hbase.util.Threads;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Category({ MediumTests.class })
+public class TestRSGroupsFallback extends TestRSGroupsBase {
+ protected static final Logger LOG = LoggerFactory.getLogger(TestRSGroupsFallback.class);
+
+ private static final String FALLBACK_GROUP = "fallback";
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ TEST_UTIL = new HBaseTestingUtility();
+ TEST_UTIL.getConfiguration().setBoolean(RSGroupBasedLoadBalancer.FALLBACK_GROUP_ENABLE_KEY,
+ true);
+ TEST_UTIL.getConfiguration().setFloat(
+ "hbase.master.balancer.stochastic.tableSkewCost", 6000);
+ TEST_UTIL.getConfiguration().set(
+ HConstants.HBASE_MASTER_LOADBALANCER_CLASS,
+ RSGroupBasedLoadBalancer.class.getName());
+ TEST_UTIL.getConfiguration().set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
+ RSGroupAdminEndpoint.class.getName() + "," + CPMasterObserver.class.getName());
+ TEST_UTIL.getConfiguration().setBoolean(
+ HConstants.ZOOKEEPER_USEMULTI,
+ true);
+ TEST_UTIL.startMiniCluster(NUM_SLAVES_BASE - 1);
+ TEST_UTIL.getConfiguration().setInt(
+ ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART,
+ NUM_SLAVES_BASE - 1);
+ TEST_UTIL.getConfiguration().setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true);
+ initialize();
+ master.balanceSwitch(true);
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ tearDownAfterClass();
+ }
+
+ @Before
+ public void beforeMethod() throws Exception {
+ setUpBeforeMethod();
+ }
+
+ @After
+ public void afterMethod() throws Exception {
+ tearDownAfterMethod();
+ }
+
+ @Test
+ public void testFallback() throws Exception {
+ // add fallback group
+ addGroup(rsGroupAdmin, FALLBACK_GROUP, 1);
+ // add test group
+ String groupName = "appInfo";
+ RSGroupInfo appInfo = addGroup(rsGroupAdmin, groupName, 1);
+ final TableName tableName = TableName.valueOf(tablePrefix + "_ns", "_testFallback");
+ admin.createNamespace(
+ NamespaceDescriptor.create(tableName.getNamespaceAsString())
+ .addConfiguration(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP, appInfo.getName()).build());
+ final HTableDescriptor desc = new HTableDescriptor(tableName);
+ desc.addFamily(new HColumnDescriptor("f"));
+ admin.createTable(desc);
+ //wait for created table to be assigned
+ TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate<Exception>() {
+ @Override
+ public boolean evaluate() throws Exception {
+ return getTableRegionMap().get(desc.getTableName()) != null;
+ }
+ });
+ TEST_UTIL.waitUntilAllRegionsAssigned(tableName);
+
+ // server of test group crash, regions move to default group
+ crashRsInGroup(groupName);
+ assertRegionsInGroup(tableName, RSGroupInfo.DEFAULT_GROUP);
+
+ // server of default group crash, regions move to any other group
+ crashRsInGroup(RSGroupInfo.DEFAULT_GROUP);
+ assertRegionsInGroup(tableName, FALLBACK_GROUP);
+
+ // add a new server to default group, regions move to default group
+ JVMClusterUtil.RegionServerThread t =
+ TEST_UTIL.getMiniHBaseCluster().startRegionServerAndWait(60000);
+ assertTrue(master.balance());
+ assertRegionsInGroup(tableName, RSGroupInfo.DEFAULT_GROUP);
+
+ // add a new server to test group, regions move back
+ JVMClusterUtil.RegionServerThread t1 =
+ TEST_UTIL.getMiniHBaseCluster().startRegionServerAndWait(60000);
+ rsGroupAdmin.moveServers(Collections.singleton(t.getRegionServer().getServerName()
+ .getAddress()), groupName);
+ assertTrue(master.balance());
+ assertRegionsInGroup(tableName, groupName);
+
+ TEST_UTIL.getMiniHBaseCluster().killRegionServer(t.getRegionServer().getServerName());
+ TEST_UTIL.getMiniHBaseCluster().killRegionServer(t1.getRegionServer().getServerName());
+
+ TEST_UTIL.deleteTable(tableName);
+ }
+
+ private void assertRegionsInGroup(TableName table, String group) throws IOException {
+ ProcedureExecutor<MasterProcedureEnv> procExecutor = TEST_UTIL.getMiniHBaseCluster()
+ .getMaster().getMasterProcedureExecutor();
+ for (ProcedureInfo procInfo: procExecutor.listProcedures()) {
+ LOG.debug("Waiting for " + procInfo.getProcName() + " " + procInfo.toString());
+ waitProcedure(procExecutor, procInfo, 10000);
+ }
+ RSGroupInfo rsGroup = rsGroupAdmin.getRSGroupInfo(group);
+ for (HRegionInfo region: master.getAssignmentManager().getRegionStates()
+ .getRegionsOfTable(table)) {
+ Address regionOnServer = master.getAssignmentManager().getRegionStates()
+ .getRegionAssignments().get(region).getAddress();
+ assertTrue(rsGroup.getServers().contains(regionOnServer));
+ }
+ }
+
+ private void crashRsInGroup(String groupName) throws Exception {
+ for (Address server : rsGroupAdmin.getRSGroupInfo(groupName).getServers()) {
+ final ServerName sn = getServerName(server);
+ TEST_UTIL.getMiniHBaseCluster().killRegionServer(sn);
+ TEST_UTIL.waitFor(60000, new Waiter.Predicate<Exception>() {
+ @Override
+ public boolean evaluate() {
+ return master.getServerManager().isServerDead(sn);
+ }
+ });
+ }
+ Threads.sleep(1000);
+ TEST_UTIL.waitUntilNoRegionsInTransition(60000);
+ }
+
+ private void waitProcedure(ProcedureExecutor<MasterProcedureEnv> procExecutor,
+ ProcedureInfo procInfo, long timeout) {
+ long start = EnvironmentEdgeManager.currentTime();
+ while ((EnvironmentEdgeManager.currentTime() - start) < timeout) {
+ if (procInfo.getProcState() == ProcedureProtos.ProcedureState.INITIALIZING ||
+ (procExecutor.isRunning() && !procExecutor.isFinished(procInfo.getProcId()))) {
+ Threads.sleep(1000);
+ } else {
+ break;
+ }
+ }
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java
index 714e72b..d140241 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.test.MetricsAssertHelper;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
@@ -394,6 +395,34 @@ public class MiniHBaseCluster extends HBaseCluster {
}
/**
+ * Starts a region server thread and waits until its processed by master. Throws an exception
+ * when it can't start a region server or when the region server is not processed by master
+ * within the timeout.
+ *
+ * @return New RegionServerThread
+ */
+ public JVMClusterUtil.RegionServerThread startRegionServerAndWait(long timeout)
+ throws IOException {
+
+ JVMClusterUtil.RegionServerThread t = startRegionServer();
+ ServerName rsServerName = t.getRegionServer().getServerName();
+
+ long start = EnvironmentEdgeManager.currentTime();
+ while ((EnvironmentEdgeManager.currentTime() - start) < timeout) {
+ ClusterStatus clusterStatus = getMaster().getClusterStatus();
+ if (clusterStatus != null && clusterStatus.getLiveServersLoad().containsKey(rsServerName)) {
+ return t;
+ }
+ Threads.sleep(100);
+ }
+ if (t.getRegionServer().isOnline()) {
+ throw new IOException("RS: " + rsServerName + " online, but not processed by master");
+ } else {
+ throw new IOException("RS: " + rsServerName + " is offline");
+ }
+ }
+
+ /**
* Cause a region server to exit doing basic clean up only on its way out.
* @param serverNumber Used as index into a list.
*/