You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2015/08/14 19:07:23 UTC
[3/6] hbase git commit: HBASE-6721 RegionServer Group based
Assignment (Francis Liu)
http://git-wip-us.apache.org/repos/asf/hbase/blob/66e16163/hbase-server/src/main/java/org/apache/hadoop/hbase/group/GroupBasedLoadBalancer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/group/GroupBasedLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/group/GroupBasedLoadBalancer.java
new file mode 100644
index 0000000..85d203c
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/group/GroupBasedLoadBalancer.java
@@ -0,0 +1,411 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.group;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.LinkedListMultimap;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ClusterStatus;
+import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HostPort;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.master.LoadBalancer;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.RegionPlan;
+import org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+/**
+ * GroupBasedLoadBalancer, used when Region Server Grouping is configured (HBase-6721)
+ * It does region balance based on a table's group membership.
+ *
+ * Most assignment methods contain two exclusive code paths: Online - when the group
+ * table is online and Offline - when it is unavailable.
+ *
+ * During Offline, assignments are assigned based on cached information in zookeeper.
+ * If unavailable (ie bootstrap) then regions are assigned randomly.
+ *
+ * Once the GROUP table has been assigned, the balancer switches to Online and will then
+ * start providing appropriate assignments for user tables.
+ *
+ */
+@InterfaceAudience.Public
+public class GroupBasedLoadBalancer implements GroupableBalancer, LoadBalancer {
+ /** Config for pluggable load balancers */
+ public static final String HBASE_GROUP_LOADBALANCER_CLASS = "hbase.group.grouploadbalancer.class";
+
+ private static final Log LOG = LogFactory.getLog(GroupBasedLoadBalancer.class);
+
+ private Configuration config;
+ private ClusterStatus clusterStatus;
+ private MasterServices masterServices;
+ private GroupInfoManager groupManager;
+ private LoadBalancer internalBalancer;
+
+ //used during reflection by LoadBalancerFactory
+ @InterfaceAudience.Private
+ public GroupBasedLoadBalancer() {
+ }
+
+ //This constructor should only be used for unit testing
+ @InterfaceAudience.Private
+ public GroupBasedLoadBalancer(GroupInfoManager groupManager) {
+ this.groupManager = groupManager;
+ }
+
+ @Override
+ public Configuration getConf() {
+ return config;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.config = conf;
+ }
+
+ @Override
+ public void setClusterStatus(ClusterStatus st) {
+ this.clusterStatus = st;
+ }
+
+ @Override
+ public void setMasterServices(MasterServices masterServices) {
+ this.masterServices = masterServices;
+ }
+
+ @Override
+ public List<RegionPlan> balanceCluster(Map<ServerName, List<HRegionInfo>> clusterState)
+ throws HBaseIOException {
+
+ if (!isOnline()) {
+ throw new IllegalStateException(GroupInfoManager.GROUP_TABLE_NAME+
+ " is not online, unable to perform balance");
+ }
+
+ Map<ServerName,List<HRegionInfo>> correctedState = correctAssignments(clusterState);
+ List<RegionPlan> regionPlans = new ArrayList<RegionPlan>();
+ try {
+ for (GroupInfo info : groupManager.listGroups()) {
+ Map<ServerName, List<HRegionInfo>> groupClusterState = new HashMap<ServerName, List<HRegionInfo>>();
+ for (HostPort sName : info.getServers()) {
+ for(ServerName curr: clusterState.keySet()) {
+ if(curr.getHostPort().equals(sName)) {
+ groupClusterState.put(curr, correctedState.get(curr));
+ }
+ }
+ }
+ List<RegionPlan> groupPlans = this.internalBalancer
+ .balanceCluster(groupClusterState);
+ if (groupPlans != null) {
+ regionPlans.addAll(groupPlans);
+ }
+ }
+ } catch (IOException exp) {
+ LOG.warn("Exception while balancing cluster.", exp);
+ regionPlans.clear();
+ }
+ return regionPlans;
+ }
+
+ @Override
+ public Map<ServerName, List<HRegionInfo>> roundRobinAssignment (
+ List<HRegionInfo> regions, List<ServerName> servers) throws HBaseIOException {
+ Map<ServerName, List<HRegionInfo>> assignments = Maps.newHashMap();
+ ListMultimap<String,HRegionInfo> regionMap = ArrayListMultimap.create();
+ ListMultimap<String,ServerName> serverMap = ArrayListMultimap.create();
+ generateGroupMaps(regions, servers, regionMap, serverMap);
+ for(String groupKey : regionMap.keySet()) {
+ if (regionMap.get(groupKey).size() > 0) {
+ Map<ServerName, List<HRegionInfo>> result =
+ this.internalBalancer.roundRobinAssignment(
+ regionMap.get(groupKey),
+ serverMap.get(groupKey));
+ if(result != null) {
+ assignments.putAll(result);
+ }
+ }
+ }
+ return assignments;
+ }
+
+ @Override
+ public Map<ServerName, List<HRegionInfo>> retainAssignment(
+ Map<HRegionInfo, ServerName> regions, List<ServerName> servers) throws HBaseIOException {
+ try {
+ Map<ServerName, List<HRegionInfo>> assignments = new TreeMap<ServerName, List<HRegionInfo>>();
+ ListMultimap<String, HRegionInfo> groupToRegion = ArrayListMultimap.create();
+ Set<HRegionInfo> misplacedRegions = getMisplacedRegions(regions);
+ for (HRegionInfo region : regions.keySet()) {
+ if (!misplacedRegions.contains(region)) {
+ String groupName = groupManager.getGroupOfTable(region.getTable());
+ groupToRegion.put(groupName, region);
+ }
+ }
+ // Now the "groupToRegion" map has only the regions which have correct
+ // assignments.
+ for (String key : groupToRegion.keySet()) {
+ Map<HRegionInfo, ServerName> currentAssignmentMap = new TreeMap<HRegionInfo, ServerName>();
+ List<HRegionInfo> regionList = groupToRegion.get(key);
+ GroupInfo info = groupManager.getGroup(key);
+ List<ServerName> candidateList = filterOfflineServers(info, servers);
+ for (HRegionInfo region : regionList) {
+ currentAssignmentMap.put(region, regions.get(region));
+ }
+ if(candidateList.size() > 0) {
+ assignments.putAll(this.internalBalancer.retainAssignment(
+ currentAssignmentMap, candidateList));
+ }
+ }
+
+ for (HRegionInfo region : misplacedRegions) {
+ String groupName = groupManager.getGroupOfTable(
+ region.getTable());
+ GroupInfo info = groupManager.getGroup(groupName);
+ List<ServerName> candidateList = filterOfflineServers(info, servers);
+ ServerName server = this.internalBalancer.randomAssignment(region,
+ candidateList);
+ if (server != null && !assignments.containsKey(server)) {
+ assignments.put(server, new ArrayList<HRegionInfo>());
+ } else if (server != null) {
+ assignments.get(server).add(region);
+ } else {
+ //if not server is available assign to bogus so it ends up in RIT
+ if(!assignments.containsKey(BOGUS_SERVER_NAME)) {
+ assignments.put(BOGUS_SERVER_NAME, new ArrayList<HRegionInfo>());
+ }
+ assignments.get(BOGUS_SERVER_NAME).add(region);
+ }
+ }
+ return assignments;
+ } catch (IOException e) {
+ throw new HBaseIOException("Failed to do online retain assignment", e);
+ }
+ }
+
+ @Override
+ public Map<HRegionInfo, ServerName> immediateAssignment(
+ List<HRegionInfo> regions, List<ServerName> servers) throws HBaseIOException {
+ Map<HRegionInfo,ServerName> assignments = Maps.newHashMap();
+ ListMultimap<String,HRegionInfo> regionMap = LinkedListMultimap.create();
+ ListMultimap<String,ServerName> serverMap = LinkedListMultimap.create();
+ generateGroupMaps(regions, servers, regionMap, serverMap);
+ for(String groupKey : regionMap.keySet()) {
+ if (regionMap.get(groupKey).size() > 0) {
+ assignments.putAll(
+ this.internalBalancer.immediateAssignment(
+ regionMap.get(groupKey),
+ serverMap.get(groupKey)));
+ }
+ }
+ return assignments;
+ }
+
+ @Override
+ public ServerName randomAssignment(HRegionInfo region,
+ List<ServerName> servers) throws HBaseIOException {
+ ListMultimap<String,HRegionInfo> regionMap = LinkedListMultimap.create();
+ ListMultimap<String,ServerName> serverMap = LinkedListMultimap.create();
+ generateGroupMaps(Lists.newArrayList(region), servers, regionMap, serverMap);
+ List<ServerName> filteredServers = serverMap.get(regionMap.keySet().iterator().next());
+ return this.internalBalancer.randomAssignment(region, filteredServers);
+ }
+
+ private void generateGroupMaps(
+ List<HRegionInfo> regions,
+ List<ServerName> servers,
+ ListMultimap<String, HRegionInfo> regionMap,
+ ListMultimap<String, ServerName> serverMap) throws HBaseIOException {
+ try {
+ for (HRegionInfo region : regions) {
+ String groupName = groupManager.getGroupOfTable(region.getTable());
+ if(groupName == null) {
+ LOG.warn("Group for table "+region.getTable()+" is null");
+ }
+ regionMap.put(groupName, region);
+ }
+ for (String groupKey : regionMap.keySet()) {
+ GroupInfo info = groupManager.getGroup(groupKey);
+ serverMap.putAll(groupKey, filterOfflineServers(info, servers));
+ if(serverMap.get(groupKey).size() < 1) {
+ serverMap.put(groupKey, BOGUS_SERVER_NAME);
+ }
+ }
+ } catch(IOException e) {
+ throw new HBaseIOException("Failed to generate group maps", e);
+ }
+ }
+
+ private List<ServerName> filterOfflineServers(GroupInfo groupInfo,
+ List<ServerName> onlineServers) {
+ if (groupInfo != null) {
+ return filterServers(groupInfo.getServers(), onlineServers);
+ } else {
+ LOG.debug("Group Information found to be null. Some regions might be unassigned.");
+ return Collections.EMPTY_LIST;
+ }
+ }
+
+ /**
+ * Filter servers based on the online servers.
+ *
+ * @param servers
+ * the servers
+ * @param onlineServers
+ * List of servers which are online.
+ * @return the list
+ */
+ private List<ServerName> filterServers(Collection<HostPort> servers,
+ Collection<ServerName> onlineServers) {
+ ArrayList<ServerName> finalList = new ArrayList<ServerName>();
+ for (HostPort server : servers) {
+ for(ServerName curr: onlineServers) {
+ if(curr.getHostPort().equals(server)) {
+ finalList.add(curr);
+ }
+ }
+ }
+ return finalList;
+ }
+
+ private ListMultimap<String, HRegionInfo> groupRegions(
+ List<HRegionInfo> regionList) throws IOException {
+ ListMultimap<String, HRegionInfo> regionGroup = ArrayListMultimap
+ .create();
+ for (HRegionInfo region : regionList) {
+ String groupName = groupManager.getGroupOfTable(region.getTable());
+ regionGroup.put(groupName, region);
+ }
+ return regionGroup;
+ }
+
+ private Set<HRegionInfo> getMisplacedRegions(
+ Map<HRegionInfo, ServerName> regions) throws IOException {
+ Set<HRegionInfo> misplacedRegions = new HashSet<HRegionInfo>();
+ for (HRegionInfo region : regions.keySet()) {
+ ServerName assignedServer = regions.get(region);
+ GroupInfo info = groupManager.getGroup(groupManager.getGroupOfTable(region.getTable()));
+ if (assignedServer != null &&
+ (info == null || !info.containsServer(assignedServer.getHostPort()))) {
+ LOG.warn("Found misplaced region: "+region.getRegionNameAsString()+
+ " on server: "+assignedServer+
+ " found in group: "+groupManager.getGroupOfServer(assignedServer.getHostPort())+
+ " outside of group: "+info.getName());
+ misplacedRegions.add(region);
+ }
+ }
+ return misplacedRegions;
+ }
+
+ private Map<ServerName, List<HRegionInfo>> correctAssignments(
+ Map<ServerName, List<HRegionInfo>> existingAssignments){
+ Map<ServerName, List<HRegionInfo>> correctAssignments = new TreeMap<ServerName, List<HRegionInfo>>();
+ List<HRegionInfo> misplacedRegions = new LinkedList<HRegionInfo>();
+ for (ServerName sName : existingAssignments.keySet()) {
+ correctAssignments.put(sName, new LinkedList<HRegionInfo>());
+ List<HRegionInfo> regions = existingAssignments.get(sName);
+ for (HRegionInfo region : regions) {
+ GroupInfo info = null;
+ try {
+ info = groupManager.getGroup(groupManager.getGroupOfTable(region.getTable()));
+ }catch(IOException exp){
+ LOG.debug("Group information null for region of table " + region.getTable(),
+ exp);
+ }
+ if ((info == null) || (!info.containsServer(sName.getHostPort()))) {
+ // Misplaced region.
+ misplacedRegions.add(region);
+ } else {
+ correctAssignments.get(sName).add(region);
+ }
+ }
+ }
+
+ //TODO bulk unassign?
+ //unassign misplaced regions, so that they are assigned to correct groups.
+ for(HRegionInfo info: misplacedRegions) {
+ this.masterServices.getAssignmentManager().unassign(info);
+ }
+ return correctAssignments;
+ }
+
+ @Override
+ public void initialize() throws HBaseIOException {
+ // Create the balancer
+ Class<? extends LoadBalancer> balancerKlass = config.getClass(
+ HBASE_GROUP_LOADBALANCER_CLASS,
+ StochasticLoadBalancer.class, LoadBalancer.class);
+ internalBalancer = ReflectionUtils.newInstance(balancerKlass, config);
+ internalBalancer.setClusterStatus(clusterStatus);
+ internalBalancer.setMasterServices(masterServices);
+ internalBalancer.setConf(config);
+ internalBalancer.initialize();
+ }
+
+ public boolean isOnline() {
+ return groupManager != null && groupManager.isOnline();
+ }
+
+ @InterfaceAudience.Private
+ public GroupInfoManager getGroupInfoManager() throws IOException {
+ return groupManager;
+ }
+
+ @Override
+ public void regionOnline(HRegionInfo regionInfo, ServerName sn) {
+ }
+
+ @Override
+ public void regionOffline(HRegionInfo regionInfo) {
+ }
+
+ @Override
+ public void stop(String why) {
+ }
+
+ @Override
+ public boolean isStopped() {
+ return false;
+ }
+
+ @Override
+ public void setGroupInfoManager(GroupInfoManager groupInfoManager) throws IOException {
+ this.groupManager = groupInfoManager;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/66e16163/hbase-server/src/main/java/org/apache/hadoop/hbase/group/GroupInfoManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/group/GroupInfoManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/group/GroupInfoManager.java
new file mode 100644
index 0000000..4ed7fa8
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/group/GroupInfoManager.java
@@ -0,0 +1,129 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.group;
+
+import org.apache.hadoop.hbase.HostPort;
+import org.apache.hadoop.hbase.NamespaceDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Interface used to manage GroupInfo storage. An implementation
+ * has the option to support offline mode.
+ * See {@link GroupBasedLoadBalancer}
+ */
+public interface GroupInfoManager {
+ //Assigned before user tables
+ public static final TableName GROUP_TABLE_NAME =
+ TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR,"rsgroup");
+ public static final byte[] GROUP_TABLE_NAME_BYTES = GROUP_TABLE_NAME.toBytes();
+ public static final String groupZNode = "groupInfo";
+ public static final byte[] META_FAMILY_BYTES = Bytes.toBytes("m");
+ public static final byte[] ROW_KEY = {0};
+
+
+ /**
+ * Adds the group.
+ *
+ * @param groupInfo the group name
+ * @throws java.io.IOException Signals that an I/O exception has occurred.
+ */
+ void addGroup(GroupInfo groupInfo) throws IOException;
+
+ /**
+ * Remove a region server group.
+ *
+ * @param groupName the group name
+ * @throws java.io.IOException Signals that an I/O exception has occurred.
+ */
+ void removeGroup(String groupName) throws IOException;
+
+ /**
+ * move servers to a new group.
+ * @param hostPorts list of servers, must be part of the same group
+ * @param srcGroup
+ * @param dstGroup
+ * @return true if move was successful
+ * @throws java.io.IOException
+ */
+ boolean moveServers(Set<HostPort> hostPorts, String srcGroup, String dstGroup) throws IOException;
+
+ /**
+ * Gets the group info of server.
+ *
+ * @param hostPort the server
+ * @return An instance of GroupInfo
+ */
+ GroupInfo getGroupOfServer(HostPort hostPort) throws IOException;
+
+ /**
+ * Gets the group information.
+ *
+ * @param groupName the group name
+ * @return An instance of GroupInfo
+ */
+ GroupInfo getGroup(String groupName) throws IOException;
+
+ /**
+ * Get the group membership of a table
+ * @param tableName
+ * @return Group name of table
+ * @throws java.io.IOException
+ */
+ String getGroupOfTable(TableName tableName) throws IOException;
+
+ /**
+ * Set the group membership of a set of tables
+ *
+ * @param tableNames
+ * @param groupName
+ * @throws java.io.IOException
+ */
+ void moveTables(Set<TableName> tableNames, String groupName) throws IOException;
+
+ /**
+ * List the groups
+ *
+ * @return list of GroupInfo
+ * @throws java.io.IOException
+ */
+ List<GroupInfo> listGroups() throws IOException;
+
+ /**
+ * Refresh/reload the group information from
+ * the persistent store
+ *
+ * @throws java.io.IOException
+ */
+ void refresh() throws IOException;
+
+ /**
+ * Whether the manager is able to fully
+ * return group metadata
+ *
+ * @return
+ */
+ boolean isOnline();
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/66e16163/hbase-server/src/main/java/org/apache/hadoop/hbase/group/GroupInfoManagerImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/group/GroupInfoManagerImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/group/GroupInfoManagerImpl.java
new file mode 100644
index 0000000..83c0c2d
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/group/GroupInfoManagerImpl.java
@@ -0,0 +1,667 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.group;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.HostPort;
+import org.apache.hadoop.hbase.NamespaceDescriptor;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.MetaScanner;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.RowMutations;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.ServerListener;
+import org.apache.hadoop.hbase.master.handler.CreateTableHandler;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
+import org.apache.hadoop.hbase.protobuf.generated.RSGroupProtos;
+import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
+import org.apache.hadoop.hbase.security.access.AccessControlLists;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ZKTable;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * This is an implementation of {@link GroupInfoManager}. Which makes
+ * use of an HBase table as the persistence store for the group information.
+ * It also makes use of zookeeper to store group information needed
+ * for bootstrapping during offline mode.
+ */
+public class GroupInfoManagerImpl implements GroupInfoManager, ServerListener {
+ private static final Log LOG = LogFactory.getLog(GroupInfoManagerImpl.class);
+
+ /** Table descriptor for <code>hbase:rsgroup</code> catalog table */
+ private final static HTableDescriptor GROUP_TABLE_DESC;
+ static {
+ GROUP_TABLE_DESC = new HTableDescriptor(GROUP_TABLE_NAME_BYTES);
+ GROUP_TABLE_DESC.addFamily(new HColumnDescriptor(META_FAMILY_BYTES));
+ GROUP_TABLE_DESC.setRegionSplitPolicyClassName(DisabledRegionSplitPolicy.class.getName());
+ }
+
+ //Access to this map should always be synchronized.
+ private volatile Map<String, GroupInfo> groupMap;
+ private volatile Map<TableName, String> tableMap;
+ private MasterServices master;
+ private HTableInterface groupTable;
+ private ZooKeeperWatcher watcher;
+ private GroupStartupWorker groupStartupWorker;
+ //contains list of groups that were last flushed to persistent store
+ private volatile Set<String> prevGroups;
+ private GroupSerDe groupSerDe;
+ private DefaultServerUpdater defaultServerUpdater;
+
+
+ public GroupInfoManagerImpl(MasterServices master) throws IOException {
+ this.groupMap = Collections.EMPTY_MAP;
+ this.tableMap = Collections.EMPTY_MAP;
+ groupSerDe = new GroupSerDe();
+ this.master = master;
+ this.watcher = master.getZooKeeper();
+ groupStartupWorker = new GroupStartupWorker(this, master);
+ prevGroups = new HashSet<String>();
+ refresh();
+ groupStartupWorker.start();
+ defaultServerUpdater = new DefaultServerUpdater(this);
+ master.getServerManager().registerListener(this);
+ defaultServerUpdater.start();
+ }
+
+ /**
+ * Adds the group.
+ *
+ * @param groupInfo the group name
+ */
+ @Override
+ public synchronized void addGroup(GroupInfo groupInfo) throws IOException {
+ if (groupMap.get(groupInfo.getName()) != null ||
+ groupInfo.getName().equals(GroupInfo.DEFAULT_GROUP)) {
+ throw new DoNotRetryIOException("Group already exists: "+groupInfo.getName());
+ }
+ Map<String, GroupInfo> newGroupMap = Maps.newHashMap(groupMap);
+ newGroupMap.put(groupInfo.getName(), groupInfo);
+ flushConfig(newGroupMap);
+ }
+
+ @Override
+ public synchronized boolean moveServers(Set<HostPort> hostPorts, String srcGroup,
+ String dstGroup) throws IOException {
+ GroupInfo src = new GroupInfo(getGroup(srcGroup));
+ GroupInfo dst = new GroupInfo(getGroup(dstGroup));
+ boolean foundOne = false;
+ for(HostPort el: hostPorts) {
+ foundOne = src.removeServer(el) || foundOne;
+ dst.addServer(el);
+ }
+
+ Map<String,GroupInfo> newGroupMap = Maps.newHashMap(groupMap);
+ newGroupMap.put(src.getName(), src);
+ newGroupMap.put(dst.getName(), dst);
+
+ flushConfig(newGroupMap);
+ return foundOne;
+ }
+
+ /**
+ * Gets the group info of server.
+ *
+ * @param hostPort the server
+ * @return An instance of GroupInfo.
+ */
+ @Override
+ public GroupInfo getGroupOfServer(HostPort hostPort) throws IOException {
+ for (GroupInfo info : groupMap.values()) {
+ if (info.containsServer(hostPort)){
+ return info;
+ }
+ }
+ return getGroup(GroupInfo.DEFAULT_GROUP);
+ }
+
+ /**
+ * Gets the group information.
+ *
+ * @param groupName
+ * the group name
+ * @return An instance of GroupInfo
+ */
+ @Override
+ public GroupInfo getGroup(String groupName) throws IOException {
+ GroupInfo groupInfo = groupMap.get(groupName);
+ return groupInfo;
+ }
+
+
+
+ @Override
+ public String getGroupOfTable(TableName tableName) throws IOException {
+ return tableMap.get(tableName);
+ }
+
+ @Override
+ public synchronized void moveTables(Set<TableName> tableNames, String groupName) throws IOException {
+ if (groupName != null && !groupMap.containsKey(groupName)) {
+ throw new DoNotRetryIOException("Group "+groupName+" does not exist or is a special group");
+ }
+ Map<String,GroupInfo> newGroupMap = Maps.newHashMap(groupMap);
+ for(TableName tableName: tableNames) {
+ if (tableMap.containsKey(tableName)) {
+ GroupInfo src = new GroupInfo(groupMap.get(tableMap.get(tableName)));
+ src.removeTable(tableName);
+ newGroupMap.put(src.getName(), src);
+ }
+ if(groupName != null) {
+ GroupInfo dst = new GroupInfo(newGroupMap.get(groupName));
+ dst.addTable(tableName);
+ newGroupMap.put(dst.getName(), dst);
+ }
+ }
+
+ flushConfig(newGroupMap);
+ }
+
+
+ /**
+ * Delete a region server group.
+ *
+ * @param groupName the group name
+ * @throws java.io.IOException Signals that an I/O exception has occurred.
+ */
+ @Override
+ public synchronized void removeGroup(String groupName) throws IOException {
+ if (!groupMap.containsKey(groupName) || groupName.equals(GroupInfo.DEFAULT_GROUP)) {
+ throw new DoNotRetryIOException("Group "+groupName+" does not exist or is a reserved group");
+ }
+ Map<String,GroupInfo> newGroupMap = Maps.newHashMap(groupMap);
+ newGroupMap.remove(groupName);
+ flushConfig(newGroupMap);
+ }
+
+ @Override
+ public List<GroupInfo> listGroups() throws IOException {
+ List<GroupInfo> list = Lists.newLinkedList(groupMap.values());
+ return list;
+ }
+
+ @Override
+ public boolean isOnline() {
+ return groupStartupWorker.isOnline();
+ }
+
+ @Override
+ public synchronized void refresh() throws IOException {
+ refresh(false);
+ }
+
+ private synchronized void refresh(boolean forceOnline) throws IOException {
+ List<GroupInfo> groupList = new LinkedList<GroupInfo>();
+
+ //overwrite anything read from zk, group table is source of truth
+ //if online read from GROUP table
+ if (forceOnline || isOnline()) {
+ LOG.debug("Refreshing in Online mode.");
+ if (groupTable == null) {
+ groupTable = new HTable(master.getConfiguration(), GROUP_TABLE_NAME);
+ }
+ groupList.addAll(groupSerDe.retrieveGroupList(groupTable));
+ } else {
+ LOG.debug("Refershing in Offline mode.");
+ String groupBasePath = ZKUtil.joinZNode(watcher.baseZNode, groupZNode);
+ groupList.addAll(groupSerDe.retrieveGroupList(watcher, groupBasePath));
+ }
+
+ //refresh default group, prune
+ NavigableSet<TableName> orphanTables = new TreeSet<TableName>();
+ for(String entry: master.getTableDescriptors().getAll().keySet()) {
+ orphanTables.add(TableName.valueOf(entry));
+ }
+
+ List<TableName> specialTables;
+ if(!master.isInitialized()) {
+ specialTables = new ArrayList<TableName>();
+ specialTables.add(AccessControlLists.ACL_TABLE_NAME);
+ specialTables.add(TableName.META_TABLE_NAME);
+ specialTables.add(TableName.NAMESPACE_TABLE_NAME);
+ specialTables.add(GroupInfoManager.GROUP_TABLE_NAME);
+ } else {
+ specialTables =
+ master.listTableNamesByNamespace(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR);
+ }
+
+ for(TableName table : specialTables) {
+ orphanTables.add(table);
+ }
+ for(GroupInfo group: groupList) {
+ if(!group.getName().equals(GroupInfo.DEFAULT_GROUP)) {
+ orphanTables.removeAll(group.getTables());
+ }
+ }
+
+ //This is added to the last of the list
+ //so it overwrites the default group loaded
+ //from region group table or zk
+ groupList.add(new GroupInfo(GroupInfo.DEFAULT_GROUP,
+ new TreeSet<HostPort>(getDefaultServers()),
+ orphanTables));
+
+
+ //populate the data
+ HashMap<String, GroupInfo> newGroupMap = Maps.newHashMap();
+ HashMap<TableName, String> newTableMap = Maps.newHashMap();
+ for (GroupInfo group : groupList) {
+ newGroupMap.put(group.getName(), group);
+ for(TableName table: group.getTables()) {
+ newTableMap.put(table, group.getName());
+ }
+ }
+ groupMap = Collections.unmodifiableMap(newGroupMap);
+ tableMap = Collections.unmodifiableMap(newTableMap);
+
+ prevGroups.clear();
+ prevGroups.addAll(groupMap.keySet());
+ }
+
+ private synchronized Map<TableName,String> flushConfigTable(Map<String,GroupInfo> newGroupMap) throws IOException {
+ Map<TableName,String> newTableMap = Maps.newHashMap();
+ Put put = new Put(ROW_KEY);
+ Delete delete = new Delete(ROW_KEY);
+
+ //populate deletes
+ for(String groupName : prevGroups) {
+ if(!newGroupMap.containsKey(groupName)) {
+ delete.deleteColumns(META_FAMILY_BYTES, Bytes.toBytes(groupName));
+ }
+ }
+
+ //populate puts
+ for(GroupInfo groupInfo : newGroupMap.values()) {
+ RSGroupProtos.GroupInfo proto = ProtobufUtil.toProtoGroupInfo(groupInfo);
+ put.add(META_FAMILY_BYTES,
+ Bytes.toBytes(groupInfo.getName()),
+ proto.toByteArray());
+ for(TableName entry: groupInfo.getTables()) {
+ newTableMap.put(entry, groupInfo.getName());
+ }
+ }
+
+ RowMutations rowMutations = new RowMutations(ROW_KEY);
+ if(put.size() > 0) {
+ rowMutations.add(put);
+ }
+ if(delete.size() > 0) {
+ rowMutations.add(delete);
+ }
+ if(rowMutations.getMutations().size() > 0) {
+ groupTable.mutateRow(rowMutations);
+ }
+ return newTableMap;
+ }
+
+ private synchronized void flushConfig(Map<String,GroupInfo> newGroupMap) throws IOException {
+ Map<TableName, String> newTableMap;
+ //this should only not enter during startup
+ if(!isOnline()) {
+ LOG.error("Still in Offline mode.");
+ throw new IOException("Still in Offline mode.");
+ }
+
+ newTableMap = flushConfigTable(newGroupMap);
+
+ //make changes visible since it has been
+ //persisted in the source of truth
+ groupMap = Collections.unmodifiableMap(newGroupMap);
+ tableMap = Collections.unmodifiableMap(newTableMap);
+
+
+ try {
+ //Write zk data first since that's what we'll read first
+ String groupBasePath = ZKUtil.joinZNode(watcher.baseZNode, groupZNode);
+ ZKUtil.createAndFailSilent(watcher, groupBasePath, ProtobufUtil.PB_MAGIC);
+
+ List<ZKUtil.ZKUtilOp> zkOps = new ArrayList<ZKUtil.ZKUtilOp>(newGroupMap.size());
+ for(String groupName : prevGroups) {
+ if(!newGroupMap.containsKey(groupName)) {
+ String znode = ZKUtil.joinZNode(groupBasePath, groupName);
+ zkOps.add(ZKUtil.ZKUtilOp.deleteNodeFailSilent(znode));
+ }
+ }
+
+
+ for(GroupInfo groupInfo : newGroupMap.values()) {
+ String znode = ZKUtil.joinZNode(groupBasePath, groupInfo.getName());
+ RSGroupProtos.GroupInfo proto = ProtobufUtil.toProtoGroupInfo(groupInfo);
+ LOG.debug("Updating znode: "+znode);
+ ZKUtil.createAndFailSilent(watcher, znode);
+ zkOps.add(ZKUtil.ZKUtilOp.deleteNodeFailSilent(znode));
+ zkOps.add(ZKUtil.ZKUtilOp.createAndFailSilent(znode,
+ ProtobufUtil.prependPBMagic(proto.toByteArray())));
+ }
+ LOG.debug("Writing ZK GroupInfo count: " + zkOps.size());
+
+ ZKUtil.multiOrSequential(watcher, zkOps, false);
+ } catch (KeeperException e) {
+ LOG.error("Failed to write to groupZNode", e);
+ master.abort("Failed to write to groupZNode", e);
+ throw new IOException("Failed to write to groupZNode",e);
+ }
+
+ prevGroups.clear();
+ prevGroups.addAll(newGroupMap.keySet());
+ }
+
+ private List<ServerName> getOnlineRS() throws IOException {
+ if (master != null) {
+ return master.getServerManager().getOnlineServersList();
+ }
+ try {
+ LOG.debug("Reading online RS from zookeeper");
+ List<ServerName> servers = new LinkedList<ServerName>();
+ for (String el: ZKUtil.listChildrenNoWatch(watcher, watcher.rsZNode)) {
+ servers.add(ServerName.parseServerName(el));
+ }
+ return servers;
+ } catch (KeeperException e) {
+ throw new IOException("Failed to retrieve server list from zookeeper", e);
+ }
+ }
+
+ private List<HostPort> getDefaultServers() throws IOException {
+ List<HostPort> defaultServers = new LinkedList<HostPort>();
+ for(ServerName server : getOnlineRS()) {
+ HostPort hostPort = new HostPort(server.getHostname(), server.getPort());
+ boolean found = false;
+ for(GroupInfo groupInfo : groupMap.values()) {
+ if(!GroupInfo.DEFAULT_GROUP.equals(groupInfo.getName()) &&
+ groupInfo.containsServer(hostPort)) {
+ found = true;
+ break;
+ }
+ }
+ if(!found) {
+ defaultServers.add(hostPort);
+ }
+ }
+ return defaultServers;
+ }
+
+ private synchronized void updateDefaultServers(
+ NavigableSet<HostPort> hostPort) throws IOException {
+ if(!isOnline()) {
+ LOG.info("Offline mode. Skipping update of default servers");
+ return;
+ }
+ GroupInfo info = groupMap.get(GroupInfo.DEFAULT_GROUP);
+ GroupInfo newInfo = new GroupInfo(info.getName(), hostPort, info.getTables());
+ HashMap<String, GroupInfo> newGroupMap = Maps.newHashMap(groupMap);
+ newGroupMap.put(newInfo.getName(), newInfo);
+ flushConfig(newGroupMap);
+ }
+
+ @Override
+ public void serverAdded(ServerName serverName) {
+ defaultServerUpdater.serverChanged();
+ }
+
+ @Override
+ public void serverRemoved(ServerName serverName) {
+ defaultServerUpdater.serverChanged();
+ }
+
+ private static class DefaultServerUpdater extends Thread {
+ private static final Log LOG = LogFactory.getLog(DefaultServerUpdater.class);
+ private GroupInfoManagerImpl mgr;
+ private boolean hasChanged = false;
+
+ public DefaultServerUpdater(GroupInfoManagerImpl mgr) {
+ this.mgr = mgr;
+ }
+
+ public void run() {
+ List<HostPort> prevDefaultServers = new LinkedList<HostPort>();
+ while(!mgr.master.isAborted() || !mgr.master.isStopped()) {
+ try {
+ LOG.info("Updating default servers.");
+ List<HostPort> servers = mgr.getDefaultServers();
+ Collections.sort(servers);
+ if(!servers.equals(prevDefaultServers)) {
+ mgr.updateDefaultServers(new TreeSet<HostPort>(servers));
+ prevDefaultServers = servers;
+ LOG.info("Updated with servers: "+servers.size());
+ }
+ try {
+ synchronized (this) {
+ if(!hasChanged) {
+ wait();
+ }
+ hasChanged = false;
+ }
+ } catch (InterruptedException e) {
+ }
+ } catch (IOException e) {
+ LOG.warn("Failed to update default servers", e);
+ }
+ }
+ }
+
+ public void serverChanged() {
+ synchronized (this) {
+ hasChanged = true;
+ this.notify();
+ }
+ }
+ }
+
+
+ private static class GroupStartupWorker extends Thread {
+ private static final Log LOG = LogFactory.getLog(GroupStartupWorker.class);
+
+ private Configuration conf;
+ private volatile boolean isOnline = false;
+ private MasterServices masterServices;
+ private GroupInfoManagerImpl groupInfoManager;
+
+ public GroupStartupWorker(GroupInfoManagerImpl groupInfoManager,
+ MasterServices masterServices) {
+ this.conf = masterServices.getConfiguration();
+ this.masterServices = masterServices;
+ this.groupInfoManager = groupInfoManager;
+ setName(GroupStartupWorker.class.getName()+"-"+masterServices.getServerName());
+ setDaemon(true);
+ }
+
+ @Override
+ public void run() {
+ if(waitForGroupTableOnline()) {
+ LOG.info("GroupBasedLoadBalancer is now online");
+ }
+ }
+
+ public boolean waitForGroupTableOnline() {
+ final List<HRegionInfo> foundRegions = new LinkedList<HRegionInfo>();
+ final List<HRegionInfo> assignedRegions = new LinkedList<HRegionInfo>();
+ final AtomicBoolean found = new AtomicBoolean(false);
+ boolean createSent = false;
+
+ while (!found.get() && isMasterRunning()) {
+ foundRegions.clear();
+ assignedRegions.clear();
+ found.set(true);
+ try {
+ final HConnection conn = HConnectionManager.getConnection(conf);
+ boolean rootMetaFound =
+ masterServices.getCatalogTracker().verifyMetaRegionLocation(50);
+ final AtomicBoolean nsFound = new AtomicBoolean(false);
+ if (rootMetaFound) {
+ final ZKTable zkTable = new ZKTable(masterServices.getZooKeeper());
+ MetaScanner.MetaScannerVisitor visitor = new MetaScanner.MetaScannerVisitorBase() {
+ @Override
+ public boolean processRow(Result row) throws IOException {
+ HRegionInfo info = HRegionInfo.getHRegionInfo(row);
+ if (info != null) {
+ Cell serverCell =
+ row.getColumnLatestCell(HConstants.CATALOG_FAMILY,
+ HConstants.SERVER_QUALIFIER);
+ if (GROUP_TABLE_NAME.equals(info.getTable()) && serverCell != null) {
+ ServerName sn =
+ ServerName.parseVersionedServerName(CellUtil.cloneValue(serverCell));
+ if (sn == null) {
+ found.set(false);
+ } else if (zkTable.isEnabledTable(GROUP_TABLE_NAME)) {
+ try {
+ ClientProtos.ClientService.BlockingInterface rs = conn.getClient(sn);
+ ProtobufUtil.get(rs, info.getRegionName(), new Get(ROW_KEY));
+ assignedRegions.add(info);
+ } catch(Exception ex) {
+ LOG.debug("Caught exception while verifying group region", ex);
+ }
+ }
+ foundRegions.add(info);
+ }
+ if (TableName.NAMESPACE_TABLE_NAME.equals(info.getTable())) {
+ Cell cell = row.getColumnLatestCell(HConstants.CATALOG_FAMILY,
+ HConstants.SERVER_QUALIFIER);
+ ServerName sn = null;
+ if(cell != null) {
+ sn = ServerName.parseVersionedServerName(CellUtil.cloneValue(cell));
+ }
+ if (zkTable.isEnabledTable(TableName.NAMESPACE_TABLE_NAME)) {
+ try {
+ ClientProtos.ClientService.BlockingInterface rs = conn.getClient(sn);
+ ProtobufUtil.get(rs, info.getRegionName(), new Get(ROW_KEY));
+ nsFound.set(true);
+ } catch(Exception ex) {
+ LOG.debug("Caught exception while verifying group region", ex);
+ }
+ }
+ }
+ }
+ return true;
+ }
+ };
+ MetaScanner.metaScan(conf, visitor);
+ // if no regions in meta then we have to create the table
+ if (foundRegions.size() < 1 && rootMetaFound && !createSent && nsFound.get()) {
+ groupInfoManager.createGroupTable(masterServices);
+ createSent = true;
+ }
+ LOG.info("Group table: " + GROUP_TABLE_NAME + " isOnline: " + found.get()
+ + ", regionCount: " + foundRegions.size() + ", assignCount: "
+ + assignedRegions.size() + ", rootMetaFound: "+rootMetaFound);
+ found.set(found.get() && assignedRegions.size() == foundRegions.size()
+ && foundRegions.size() > 0);
+ } else {
+ LOG.info("Waiting for catalog tables to come online");
+ found.set(false);
+ }
+ if (found.get()) {
+ LOG.debug("With group table online, refreshing cached information.");
+ groupInfoManager.refresh(true);
+ isOnline = true;
+ //flush any inconsistencies between ZK and HTable
+ groupInfoManager.flushConfig(groupInfoManager.groupMap);
+ }
+ } catch(Exception e) {
+ found.set(false);
+ LOG.warn("Failed to perform check", e);
+ }
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ LOG.info("Sleep interrupted", e);
+ }
+ }
+ return found.get();
+ }
+
+ public boolean isOnline() {
+ return isOnline;
+ }
+
+ private boolean isMasterRunning() {
+ return !masterServices.isAborted() && !masterServices.isStopped();
+ }
+ }
+
+ private void createGroupTable(MasterServices masterServices) throws IOException {
+ HRegionInfo newRegions[] = new HRegionInfo[]{
+ new HRegionInfo(GROUP_TABLE_DESC.getTableName(), null, null)};
+ //we need to create the table this way to bypass
+ //checkInitialized
+ masterServices.getExecutorService()
+ .submit(new CreateTableHandler(
+ masterServices,
+ masterServices.getMasterFileSystem(),
+ GROUP_TABLE_DESC,
+ masterServices.getConfiguration(),
+ newRegions,
+ masterServices).prepare());
+ //wait for region to be online
+ int tries = 600;
+ while(masterServices.getAssignmentManager().getRegionStates()
+ .getRegionServerOfRegion(newRegions[0]) == null && tries > 0) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ throw new IOException("Wait interrupted", e);
+ }
+ tries--;
+ }
+ if(tries <= 0) {
+ throw new IOException("Failed to create group table.");
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/66e16163/hbase-server/src/main/java/org/apache/hadoop/hbase/group/GroupSerDe.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/group/GroupSerDe.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/group/GroupSerDe.java
new file mode 100644
index 0000000..cf32647
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/group/GroupSerDe.java
@@ -0,0 +1,88 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.group;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.RSGroupProtos;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.List;
+import java.util.NavigableMap;
+
+//TODO do better encapsulation of SerDe logic from GroupInfoManager and GroupTracker
+public class GroupSerDe {
+ private static final Log LOG = LogFactory.getLog(GroupSerDe.class);
+
+ public GroupSerDe() {
+
+ }
+
+ public List<GroupInfo> retrieveGroupList(HTableInterface groupTable) throws IOException {
+ List<GroupInfo> groupInfoList = Lists.newArrayList();
+ Result result = groupTable.get(new Get(GroupInfoManager.ROW_KEY));
+ if(!result.isEmpty()) {
+ NavigableMap<byte[],NavigableMap<byte[],byte[]>> dataMap = result.getNoVersionMap();
+ for(byte[] groupName: dataMap.get(GroupInfoManager.META_FAMILY_BYTES).keySet()) {
+ RSGroupProtos.GroupInfo proto =
+ RSGroupProtos.GroupInfo.parseFrom(
+ dataMap.get(GroupInfoManager.META_FAMILY_BYTES).get(groupName));
+ groupInfoList.add(ProtobufUtil.toGroupInfo(proto));
+ }
+ }
+ return groupInfoList;
+ }
+
+ public List<GroupInfo> retrieveGroupList(ZooKeeperWatcher watcher,
+ String groupBasePath) throws IOException {
+ List<GroupInfo> groupInfoList = Lists.newArrayList();
+ //Overwrite any info stored by table, this takes precedence
+ try {
+ if(ZKUtil.checkExists(watcher, groupBasePath) != -1) {
+ for(String znode: ZKUtil.listChildrenAndWatchForNewChildren(watcher, groupBasePath)) {
+ byte[] data = ZKUtil.getData(watcher, ZKUtil.joinZNode(groupBasePath, znode));
+ if(data.length > 0) {
+ ProtobufUtil.expectPBMagicPrefix(data);
+ ByteArrayInputStream bis = new ByteArrayInputStream(
+ data, ProtobufUtil.lengthOfPBMagic(), data.length);
+ groupInfoList.add(ProtobufUtil.toGroupInfo(RSGroupProtos.GroupInfo.parseFrom(bis)));
+ }
+ }
+ LOG.debug("Read ZK GroupInfo count:" + groupInfoList.size());
+ }
+ } catch (KeeperException e) {
+ throw new IOException("Failed to read groupZNode",e);
+ } catch (DeserializationException e) {
+ throw new IOException("Failed to read groupZNode",e);
+ }
+ return groupInfoList;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/66e16163/hbase-server/src/main/java/org/apache/hadoop/hbase/group/GroupTracker.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/group/GroupTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/group/GroupTracker.java
new file mode 100644
index 0000000..f0c0a8f
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/group/GroupTracker.java
@@ -0,0 +1,341 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.group;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HostPort;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class GroupTracker extends ZooKeeperNodeTracker {
+ private static final Log LOG = LogFactory.getLog(GroupTracker.class);
+
+ private List<Listener> listeners = Collections.synchronizedList(new ArrayList<Listener>());
+ private GroupSerDe groupSerDe = new GroupSerDe();
+ private volatile Map<String, GroupInfo> groupMap = new HashMap<String, GroupInfo>();
+ private volatile Map<HostPort, GroupInfo> serverMap = new HashMap<HostPort, GroupInfo>();
+ private RegionServerTracker rsTracker;
+ private volatile boolean started = false;
+
+ /**
+ * Constructs a new ZK node tracker.
+ * <p/>
+ * <p>After construction, use {@link #start} to kick off tracking.
+ *
+ * @param watcher
+ * @param abortable
+ */
+ public GroupTracker(ZooKeeperWatcher watcher, Abortable abortable) throws IOException {
+ //TODO make period configurable
+ super(watcher,
+ ZKUtil.joinZNode(watcher.baseZNode, GroupInfoManager.groupZNode),
+ abortable!=null?abortable:new PersistentAbortable(10000));
+ if(abortable == null) {
+ ((PersistentAbortable)this.abortable).setGroupTracker(this);
+ }
+ rsTracker = new RegionServerTracker(watcher, abortable, this);
+ try {
+ ZKUtil.listChildrenAndWatchThem(watcher, node);
+ rsTracker.start();
+ } catch (KeeperException e) {
+ throw new IOException("Failed to start RS tracker", e);
+ }
+ }
+
+ public void addListener(Listener listener) {
+ listeners.add(listener);
+ }
+
+ public void removeListener(Listener listener) {
+ listeners.remove(listener);
+ }
+
+ @Override
+ public synchronized void start() {
+ super.start();
+ started = true;
+ }
+
+ @Override
+ public void nodeCreated(String path) {
+ if (path.equals(node)) {
+ refresh();
+ }
+ }
+
+ @Override
+ public void nodeDataChanged(String path) {
+ if (path.equals(node)) {
+ nodeCreated(path);
+ }
+ }
+
+ @Override
+ public void nodeChildrenChanged(String path) {
+ if (path.startsWith(node)) {
+ refresh();
+ }
+ }
+
+ public void blockUntilReady(int timeout) throws InterruptedException, IOException {
+ blockUntilAvailable(timeout, false);
+ if(getData(false) != null) {
+ refresh(false);
+ }
+ }
+
+ private void refresh() {
+ try {
+ refresh(false);
+ } catch (IOException e) {
+ this.abortable.abort("Failed to read group znode", e);
+ }
+ }
+
+ private synchronized void refresh(boolean force) throws IOException {
+ List<ServerName> onlineRS = rsTracker.getOnlineServers();
+ Set<HostPort> hostPorts = new HashSet<HostPort>();
+ for(ServerName entry: onlineRS) {
+ hostPorts.add(new HostPort(entry.getHostname(), entry.getPort()));
+ }
+ Map<String, GroupInfo> tmpGroupMap = new HashMap<String, GroupInfo>();
+ Map<HostPort, GroupInfo> tmpServerMap = new HashMap<HostPort, GroupInfo>();
+ for(GroupInfo groupInfo: listGroups()) {
+ tmpGroupMap.put(groupInfo.getName(), groupInfo);
+ for(HostPort server: groupInfo.getServers()) {
+ tmpServerMap.put(server, groupInfo);
+ hostPorts.remove(server);
+ }
+ }
+ GroupInfo groupInfo = tmpGroupMap.get(GroupInfo.DEFAULT_GROUP);
+ groupInfo.addAllServers(hostPorts);
+ for(HostPort entry: hostPorts) {
+ tmpServerMap.put(entry, groupInfo);
+ }
+
+ //when reading sync on "this" if groupMap<->serverMap
+ //invariant needs to be guaranteed
+ groupMap = tmpGroupMap;
+ serverMap = tmpServerMap;
+
+ Map<String, GroupInfo> map = getGroupMap();
+ for(Listener listener : listeners) {
+ listener.groupMapChanged(map);
+ }
+ }
+
+ private List<GroupInfo> listGroups() throws IOException {
+ return groupSerDe.retrieveGroupList(watcher, node);
+ }
+
+ public GroupInfo getGroup(String name) {
+ GroupInfo groupInfo = groupMap.get(name);
+ return groupInfo;
+ }
+
+ public GroupInfo getGroupOfServer(String hostPort) {
+ GroupInfo groupInfo = serverMap.get(hostPort);
+ return groupInfo;
+ }
+
+ public Map<String, GroupInfo> getGroupMap() {
+ return Collections.unmodifiableMap(groupMap);
+ }
+
+ public interface Listener {
+ public void groupMapChanged(Map<String, GroupInfo> groupMap);
+ }
+
+
+ /**
+ * This class is copied for RegionServerTracker
+ * We need our own since the other one was tied to ServerManager
+ * and thus the master
+ */
+ private static class RegionServerTracker extends ZooKeeperListener {
+ private static final Log LOG = LogFactory.getLog(RegionServerTracker.class);
+ private volatile List<ServerName> regionServers = new ArrayList<ServerName>();
+ private Abortable abortable;
+ private GroupTracker groupTracker;
+
+ public RegionServerTracker(ZooKeeperWatcher watcher,
+ Abortable abortable, GroupTracker groupTracker) {
+ super(watcher);
+ this.abortable = abortable;
+ this.groupTracker = groupTracker;
+ }
+
+ public void start() throws KeeperException, IOException {
+ watcher.registerListener(this);
+ refresh();
+ }
+
+ private void add(final List<String> servers) throws IOException {
+ List<ServerName> temp = new ArrayList<ServerName>();
+ for (String n: servers) {
+ ServerName sn = ServerName.parseServerName(ZKUtil.getNodeName(n));
+ temp.add(sn);
+ }
+ regionServers = temp;
+ //we're refreshing groups, since default membership
+ //is dynamic and new servers may end up as new default group members
+ refreshGroups();
+ }
+
+ private void remove(final ServerName sn) {
+ List<ServerName> temp = new ArrayList<ServerName>();
+ for(ServerName el: regionServers) {
+ if(!sn.equals(el)) {
+ temp.add(el);
+ }
+ }
+ regionServers = temp;
+ refreshGroups();
+ }
+
+ private void refreshGroups() {
+ if(groupTracker.started && groupTracker.getData(false) != null) {
+ groupTracker.refresh();
+ }
+ }
+
+ public void refresh() throws KeeperException, IOException {
+ List<String> servers =
+ ZKUtil.listChildrenAndWatchThem(watcher, watcher.rsZNode);
+ add(servers);
+ }
+
+ @Override
+ public void nodeDeleted(String path) {
+ if (path.startsWith(watcher.rsZNode)) {
+ String serverName = ZKUtil.getNodeName(path);
+ LOG.info("RegionServer ephemeral node deleted, processing expiration [" +
+ serverName + "]");
+ ServerName sn = ServerName.parseServerName(serverName);
+ remove(sn);
+ }
+ }
+
+ @Override
+ public void nodeChildrenChanged(String path) {
+ if (path.equals(watcher.rsZNode)) {
+ try {
+ List<String> servers =
+ ZKUtil.listChildrenAndWatchThem(watcher, watcher.rsZNode);
+ add(servers);
+ } catch (IOException e) {
+ abortable.abort("Unexpected zk exception getting RS nodes", e);
+ } catch (KeeperException e) {
+ abortable.abort("Unexpected zk exception getting RS nodes", e);
+ }
+ }
+ }
+
+ /**
+ * Gets the online servers.
+ * @return list of online servers
+ */
+ public List<ServerName> getOnlineServers() {
+ return regionServers;
+ }
+ }
+
+ private static class Refresher extends Thread {
+ private final static Log LOG = LogFactory.getLog(Refresher.class);
+ private GroupTracker groupTracker;
+ private volatile boolean isRunning = true;
+ private int period;
+
+ public Refresher(GroupTracker groupTracker, int period) {
+ this.groupTracker = groupTracker;
+ this.period = period;
+ this.setDaemon(true);
+ }
+
+ public boolean isRunning() {
+ return isRunning;
+ }
+
+ @Override
+ public void run() {
+ while(true) {
+ try {
+ groupTracker.rsTracker.refresh();
+ groupTracker.refresh(true);
+ LOG.info("Recovery refresh successful");
+ isRunning = false;
+ return;
+ } catch (IOException e) {
+ LOG.warn("Failed to refresh", e);
+ } catch (KeeperException e) {
+ LOG.warn("Failed to refresh", e);
+ }
+ try {
+ Thread.sleep(period);
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+ }
+
+ private static class PersistentAbortable implements Abortable {
+ private final Log LOG = LogFactory.getLog(Abortable.class);
+ private Refresher refresher;
+ private GroupTracker groupTracker;
+ private int period;
+
+
+ public PersistentAbortable(int period) {
+ this.period = period;
+ }
+
+ public void setGroupTracker(GroupTracker groupTracker) {
+ this.groupTracker = groupTracker;
+ }
+
+ @Override
+ public void abort(String why, Throwable e) {
+ LOG.warn("Launching referesher because of abort: "+why, e);
+ if(refresher == null || !refresher.isRunning()) {
+ refresher = new Refresher(groupTracker, period);
+ }
+ }
+
+ @Override
+ public boolean isAborted() {
+ return false;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/66e16163/hbase-server/src/main/java/org/apache/hadoop/hbase/group/GroupableBalancer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/group/GroupableBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/group/GroupableBalancer.java
new file mode 100644
index 0000000..e696926
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/group/GroupableBalancer.java
@@ -0,0 +1,12 @@
+package org.apache.hadoop.hbase.group;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.master.LoadBalancer;
+
+import java.io.IOException;
+
+@InterfaceAudience.Private
+public interface GroupableBalancer extends LoadBalancer {
+
+ void setGroupInfoManager(GroupInfoManager groupInfoManager) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/66e16163/hbase-server/src/main/java/org/apache/hadoop/hbase/group/MXBean.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/group/MXBean.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/group/MXBean.java
new file mode 100644
index 0000000..6ccd0ab
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/group/MXBean.java
@@ -0,0 +1,71 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.group;
+
+import org.apache.hadoop.hbase.HostPort;
+import org.apache.hadoop.hbase.TableName;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+public interface MXBean {
+
+ public Map<String, List<HostPort>> getServersByGroup() throws IOException;
+
+ public List<GroupInfoBean> getGroups() throws IOException;
+
+ public static class GroupInfoBean {
+
+ private String name;
+ private List<HostPort> servers;
+ private List<TableName> tables;
+ private List<HostPort> offlineServers;
+
+ //Need this to convert NavigableSet to List
+ public GroupInfoBean(GroupInfo groupInfo, List<HostPort> offlineServers) {
+ this.name = groupInfo.getName();
+ this.offlineServers = offlineServers;
+ this.servers = new LinkedList<HostPort>();
+ this.servers.addAll(groupInfo.getServers());
+ this.tables = new LinkedList<TableName>();
+ this.tables.addAll(groupInfo.getTables());
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public List<HostPort> getServers() {
+ return servers;
+ }
+
+ public List<HostPort> getOfflineServers() {
+ return offlineServers;
+ }
+
+ public List<TableName> getTables() {
+ return tables;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/66e16163/hbase-server/src/main/java/org/apache/hadoop/hbase/group/MXBeanImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/group/MXBeanImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/group/MXBeanImpl.java
new file mode 100644
index 0000000..5836d2d
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/group/MXBeanImpl.java
@@ -0,0 +1,95 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.group;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HostPort;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.master.MasterServices;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class MXBeanImpl implements MXBean {
+ private static final Log LOG = LogFactory.getLog(MXBeanImpl.class);
+
+ private static MXBeanImpl instance = null;
+
+ private GroupAdmin groupAdmin;
+ private MasterServices master;
+
+ public synchronized static MXBeanImpl init(
+ final GroupAdmin groupAdmin,
+ MasterServices master) {
+ if (instance == null) {
+ instance = new MXBeanImpl(groupAdmin, master);
+ }
+ return instance;
+ }
+
+ protected MXBeanImpl(final GroupAdmin groupAdmin,
+ MasterServices master) {
+ this.groupAdmin = groupAdmin;
+ this.master = master;
+ }
+
+ @Override
+ public Map<String, List<HostPort>> getServersByGroup() throws IOException {
+ Map<String, List<HostPort>> data = new HashMap<String, List<HostPort>>();
+ for (final ServerName entry :
+ master.getServerManager().getOnlineServersList()) {
+ GroupInfo groupInfo = groupAdmin.getGroupOfServer(
+ new HostPort(entry.getHostname(), entry.getPort()));
+ if(!data.containsKey(groupInfo.getName())) {
+ data.put(groupInfo.getName(), new LinkedList<HostPort>());
+ }
+ data.get(groupInfo.getName()).add(entry.getHostPort());
+ }
+ return data;
+ }
+
+ @Override
+ public List<GroupInfoBean> getGroups() throws IOException {
+ Set<HostPort> onlineServers = Sets.newHashSet();
+ for (ServerName entry: master.getServerManager().getOnlineServersList()) {
+ onlineServers.add(new HostPort(entry.getHostname(), entry.getPort()));
+ }
+ List list = Lists.newArrayList();
+ for (GroupInfo group: groupAdmin.listGroups()) {
+ List<HostPort> deadServers = Lists.newArrayList();
+ for (HostPort server: group.getServers()) {
+ if (!onlineServers.contains(server)) {
+ deadServers.add(server);
+ }
+ }
+ list.add(new GroupInfoBean(group, deadServers));
+ }
+ return list;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/66e16163/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
index 6df721b..f95dceb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
@@ -2113,7 +2113,7 @@ public class AssignmentManager extends ZooKeeperListener {
}
}
LOG.info("Assigning " + region.getRegionNameAsString() +
- " to " + plan.getDestination().toString());
+ " to " + plan.getDestination());
// Transition RegionState to PENDING_OPEN
currentState = regionStates.updateRegionState(region,
State.PENDING_OPEN, plan.getDestination());
@@ -2402,8 +2402,13 @@ public class AssignmentManager extends ZooKeeperListener {
|| existingPlan.getDestination() == null
|| !destServers.contains(existingPlan.getDestination())) {
newPlan = true;
- randomPlan = new RegionPlan(region, null,
- balancer.randomAssignment(region, destServers));
+ try {
+ randomPlan = new RegionPlan(region, null,
+ balancer.randomAssignment(region, destServers));
+ } catch (IOException ex) {
+ LOG.warn("Failed to create new plan.",ex);
+ return null;
+ }
if (!region.isMetaTable() && shouldAssignRegionsWithFavoredNodes) {
List<HRegionInfo> regions = new ArrayList<HRegionInfo>(1);
regions.add(region);
@@ -2690,6 +2695,19 @@ public class AssignmentManager extends ZooKeeperListener {
Map<ServerName, List<HRegionInfo>> bulkPlan =
balancer.retainAssignment(regions, servers);
+ if (bulkPlan.containsKey(LoadBalancer.BOGUS_SERVER_NAME)) {
+ // Found no plan for some regions, put those regions in RIT
+ for (HRegionInfo hri : bulkPlan.get(LoadBalancer.BOGUS_SERVER_NAME)) {
+ if (tomActivated) {
+ // Set to offline so that tom will handle it
+ regionStates.updateRegionState(hri, State.OFFLINE);
+ } else {
+ regionStates.updateRegionState(hri, State.FAILED_OPEN);
+ }
+ }
+ bulkPlan.remove(LoadBalancer.BOGUS_SERVER_NAME);
+ }
+
assign(regions.size(), servers.size(),
"retainAssignment=true", bulkPlan);
}
@@ -2716,6 +2734,20 @@ public class AssignmentManager extends ZooKeeperListener {
// Generate a round-robin bulk assignment plan
Map<ServerName, List<HRegionInfo>> bulkPlan
= balancer.roundRobinAssignment(regions, servers);
+
+ if (bulkPlan.containsKey(LoadBalancer.BOGUS_SERVER_NAME)) {
+ // Found no plan for some regions, put those regions in RIT
+ for (HRegionInfo hri : bulkPlan.get(LoadBalancer.BOGUS_SERVER_NAME)) {
+ if (tomActivated) {
+ // Set to offline so that tom will handle it
+ regionStates.updateRegionState(hri, State.OFFLINE);
+ } else {
+ regionStates.updateRegionState(hri, State.FAILED_OPEN);
+ }
+ }
+ bulkPlan.remove(LoadBalancer.BOGUS_SERVER_NAME);
+ }
+
processFavoredNodes(regions);
assign(regions.size(), servers.size(),