You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2019/10/28 11:37:34 UTC
[hbase] 09/11: HBASE-22987 Calculate the region servers in default
group in foreground (#599)
This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch HBASE-22514
in repository https://gitbox.apache.org/repos/asf/hbase.git
commit 5d4786e1de6c0d2461e4c18db0c636bb693e96da
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Wed Sep 11 22:10:52 2019 +0800
HBASE-22987 Calculate the region servers in default group in foreground (#599)
Signed-off-by: Guanghao Zhang <zg...@apache.org>
---
.../hbase/rsgroup/RSGroupInfoManagerImpl.java | 138 +++++----------------
1 file changed, 32 insertions(+), 106 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java
index 6725066..7224869 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java
@@ -30,6 +30,7 @@ import java.util.OptionalLong;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
+import java.util.stream.Collectors;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.NamespaceDescriptor;
@@ -174,8 +175,6 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
private final RSGroupStartupWorker rsGroupStartupWorker;
// contains list of groups that were last flushed to persistent store
private Set<String> prevRSGroups = new HashSet<>();
- private final ServerEventsListenerThread serverEventsListenerThread =
- new ServerEventsListenerThread();
private RSGroupInfoManagerImpl(MasterServices masterServices) throws IOException {
this.masterServices = masterServices;
@@ -184,11 +183,34 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
this.rsGroupStartupWorker = new RSGroupStartupWorker();
}
+ private synchronized void updateDefaultServers() {
+ LOG.info("Updating default servers.");
+ Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(holder.groupName2Group);
+ RSGroupInfo oldDefaultGroupInfo = getRSGroup(RSGroupInfo.DEFAULT_GROUP);
+ assert oldDefaultGroupInfo != null;
+ RSGroupInfo newDefaultGroupInfo =
+ new RSGroupInfo(RSGroupInfo.DEFAULT_GROUP, getDefaultServers());
+ newDefaultGroupInfo.addAllTables(oldDefaultGroupInfo.getTables());
+ newGroupMap.put(RSGroupInfo.DEFAULT_GROUP, newDefaultGroupInfo);
+ // do not need to persist, as we do not persist default group.
+ resetRSGroupMap(newGroupMap);
+ LOG.info("Updated default servers, {} servers", newDefaultGroupInfo.getServers().size());
+ }
private synchronized void init() throws IOException {
refresh(false);
- serverEventsListenerThread.start();
- masterServices.getServerManager().registerListener(serverEventsListenerThread);
+ masterServices.getServerManager().registerListener(new ServerListener() {
+
+ @Override
+ public void serverAdded(ServerName serverName) {
+ updateDefaultServers();
+ }
+
+ @Override
+ public void serverRemoved(ServerName serverName) {
+ updateDefaultServers();
+ }
+ });
migrate();
}
@@ -225,19 +247,11 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
}
/**
- * @param master the master to get online servers for
* @return Set of online Servers named for their hostname and port (not ServerName).
*/
- private static Set<Address> getOnlineServers(final MasterServices master) {
- Set<Address> onlineServers = new HashSet<Address>();
- if (master == null) {
- return onlineServers;
- }
-
- for (ServerName server : master.getServerManager().getOnlineServers().keySet()) {
- onlineServers.add(server.getAddress());
- }
- return onlineServers;
+ private Set<Address> getOnlineServers() {
+ return masterServices.getServerManager().getOnlineServers().keySet().stream()
+ .map(ServerName::getAddress).collect(Collectors.toSet());
}
@Override
@@ -249,8 +263,7 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
// it. If not 'default' group, add server to 'dst' rsgroup EVEN IF IT IS NOT online (could be a
// rsgroup of dead servers that are to come back later).
Set<Address> onlineServers =
- dst.getName().equals(RSGroupInfo.DEFAULT_GROUP) ? getOnlineServers(this.masterServices)
- : null;
+ dst.getName().equals(RSGroupInfo.DEFAULT_GROUP) ? getOnlineServers() : null;
for (Address el : servers) {
src.removeServer(el);
if (onlineServers != null) {
@@ -617,25 +630,8 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
this.prevRSGroups.addAll(currentGroups);
}
- // Called by getDefaultServers. Presume it has lock in place.
- private List<ServerName> getOnlineRS() throws IOException {
- if (masterServices != null) {
- return masterServices.getServerManager().getOnlineServersList();
- }
- LOG.debug("Reading online RS from zookeeper");
- List<ServerName> servers = new ArrayList<>();
- try {
- for (String el : ZKUtil.listChildrenNoWatch(watcher, watcher.getZNodePaths().rsZNode)) {
- servers.add(ServerName.parseServerName(el));
- }
- } catch (KeeperException e) {
- throw new IOException("Failed to retrieve server list from zookeeper", e);
- }
- return servers;
- }
-
// Called by ServerEventsListenerThread. Presume it has lock on this manager when it runs.
- private SortedSet<Address> getDefaultServers() throws IOException {
+ private SortedSet<Address> getDefaultServers() {
// Build a list of servers in other groups than default group, from rsGroupMap
Set<Address> serversInOtherGroup = new HashSet<>();
for (RSGroupInfo group : listRSGroups() /* get from rsGroupMap */) {
@@ -646,7 +642,7 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
// Get all online servers from Zookeeper and find out servers in default group
SortedSet<Address> defaultServers = Sets.newTreeSet();
- for (ServerName serverName : getOnlineRS()) {
+ for (ServerName serverName : masterServices.getServerManager().getOnlineServers().keySet()) {
Address server = Address.fromParts(serverName.getHostname(), serverName.getPort());
if (!serversInOtherGroup.contains(server)) { // not in other groups
defaultServers.add(server);
@@ -655,76 +651,6 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
return defaultServers;
}
- // Called by ServerEventsListenerThread. Synchronize on this because redoing
- // the rsGroupMap then writing it out.
- private synchronized void updateDefaultServers(SortedSet<Address> servers) {
- Map<String, RSGroupInfo> rsGroupMap = holder.groupName2Group;
- RSGroupInfo info = rsGroupMap.get(RSGroupInfo.DEFAULT_GROUP);
- RSGroupInfo newInfo = new RSGroupInfo(info.getName(), servers);
- HashMap<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
- newGroupMap.put(newInfo.getName(), newInfo);
- resetRSGroupMap(newGroupMap);
- }
-
- /**
- * Calls {@link RSGroupInfoManagerImpl#updateDefaultServers(SortedSet)} to update list of known
- * servers. Notifications about server changes are received by registering {@link ServerListener}.
- * As a listener, we need to return immediately, so the real work of updating the servers is done
- * asynchronously in this thread.
- */
- private class ServerEventsListenerThread extends Thread implements ServerListener {
- private final Logger LOG = LoggerFactory.getLogger(ServerEventsListenerThread.class);
- private boolean changed = false;
-
- ServerEventsListenerThread() {
- setDaemon(true);
- }
-
- @Override
- public void serverAdded(ServerName serverName) {
- serverChanged();
- }
-
- @Override
- public void serverRemoved(ServerName serverName) {
- serverChanged();
- }
-
- private synchronized void serverChanged() {
- changed = true;
- this.notify();
- }
-
- @Override
- public void run() {
- setName(ServerEventsListenerThread.class.getName() + "-" + masterServices.getServerName());
- SortedSet<Address> prevDefaultServers = new TreeSet<>();
- while (isMasterRunning(masterServices)) {
- try {
- LOG.info("Updating default servers.");
- SortedSet<Address> servers = RSGroupInfoManagerImpl.this.getDefaultServers();
- if (!servers.equals(prevDefaultServers)) {
- RSGroupInfoManagerImpl.this.updateDefaultServers(servers);
- prevDefaultServers = servers;
- LOG.info("Updated with servers: " + servers.size());
- }
- try {
- synchronized (this) {
- while (!changed) {
- wait();
- }
- changed = false;
- }
- } catch (InterruptedException e) {
- LOG.warn("Interrupted", e);
- }
- } catch (IOException e) {
- LOG.warn("Failed to update default servers", e);
- }
- }
- }
- }
-
private class RSGroupStartupWorker extends Thread {
private final Logger LOG = LoggerFactory.getLogger(RSGroupStartupWorker.class);
private volatile boolean online = false;