You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2015/08/01 18:36:15 UTC

[3/6] hbase git commit: HBASE-6721 RegionServer group based assignment (Francis Liu)

http://git-wip-us.apache.org/repos/asf/hbase/blob/16f65bad/hbase-server/src/main/java/org/apache/hadoop/hbase/group/GroupBasedLoadBalancer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/group/GroupBasedLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/group/GroupBasedLoadBalancer.java
new file mode 100644
index 0000000..ae5fcac
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/group/GroupBasedLoadBalancer.java
@@ -0,0 +1,433 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.group;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.LinkedListMultimap;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ClusterStatus;
+import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HostPort;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.master.LoadBalancer;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.RegionPlan;
+import org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer;
+import org.apache.hadoop.hbase.security.access.AccessControlLists;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+/**
+ * GroupBasedLoadBalancer, used when Region Server Grouping is configured (HBase-6721)
+ * It does region balance based on a table's group membership.
+ *
+ * Most assignment methods contain two exclusive code paths: Online - when the group
+ * table is online and Offline - when it is unavailable.
+ *
+ * During Offline, assignments are assigned based on cached information in zookeeper.
+ * If unavailable (ie bootstrap) then regions are assigned randombly.
+ *
+ * Once the GROUP table has been assigned, the balancer switches to Online and will then
+ * start providing appropriate assignments for user tables.
+ *
+ */
+@InterfaceAudience.Public
+public class GroupBasedLoadBalancer implements GroupableBalancer, LoadBalancer {
+  /** Config for pluggable load balancers */
+  public static final String HBASE_GROUP_LOADBALANCER_CLASS = "hbase.group.grouploadbalancer.class";
+
+  private static final Log LOG = LogFactory.getLog(GroupBasedLoadBalancer.class);
+
+  private Configuration config;
+  private ClusterStatus clusterStatus;
+  private MasterServices masterServices;
+  private GroupInfoManager groupManager;
+  private LoadBalancer internalBalancer;
+
+  //used during reflection by LoadBalancerFactory
+  @InterfaceAudience.Private
+  public GroupBasedLoadBalancer() {
+  }
+
+  //This constructor should only be used for unit testing
+  @InterfaceAudience.Private
+  public GroupBasedLoadBalancer(GroupInfoManager groupManager) {
+    this.groupManager = groupManager;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return config;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.config = conf;
+  }
+
+  @Override
+  public void setClusterStatus(ClusterStatus st) {
+    this.clusterStatus = st;
+  }
+
+  @Override
+  public void setMasterServices(MasterServices masterServices) {
+    this.masterServices = masterServices;
+  }
+
+  @Override
+  public List<RegionPlan> balanceCluster(Map<ServerName, List<HRegionInfo>> clusterState)
+      throws HBaseIOException {
+
+    if (!isOnline()) {
+      throw new IllegalStateException(GroupInfoManager.GROUP_TABLE_NAME+
+          " is not online, unable to perform balance");
+    }
+
+    Map<ServerName,List<HRegionInfo>> correctedState = correctAssignments(clusterState);
+    List<RegionPlan> regionPlans = new ArrayList<RegionPlan>();
+    try {
+      for (GroupInfo info : groupManager.listGroups()) {
+        Map<ServerName, List<HRegionInfo>> groupClusterState = new HashMap<ServerName, List<HRegionInfo>>();
+        for (HostPort sName : info.getServers()) {
+          for(ServerName curr: clusterState.keySet()) {
+            if(curr.getHostPort().equals(sName)) {
+              groupClusterState.put(curr, correctedState.get(curr));
+            }
+          }
+        }
+        List<RegionPlan> groupPlans = this.internalBalancer
+            .balanceCluster(groupClusterState);
+        if (groupPlans != null) {
+          regionPlans.addAll(groupPlans);
+        }
+      }
+    } catch (IOException exp) {
+      LOG.warn("Exception while balancing cluster.", exp);
+      regionPlans.clear();
+    }
+    return regionPlans;
+  }
+
+  @Override
+  public Map<ServerName, List<HRegionInfo>> roundRobinAssignment (
+      List<HRegionInfo> regions, List<ServerName> servers) throws HBaseIOException {
+    Map<ServerName, List<HRegionInfo>> assignments = Maps.newHashMap();
+    ListMultimap<String,HRegionInfo> regionMap = LinkedListMultimap.create();
+    ListMultimap<String,ServerName> serverMap = LinkedListMultimap.create();
+    generateGroupMaps(regions, servers, regionMap, serverMap);
+    for(String groupKey : regionMap.keySet()) {
+      if (regionMap.get(groupKey).size() > 0) {
+        Map<ServerName, List<HRegionInfo>> result =
+            this.internalBalancer.roundRobinAssignment(
+                regionMap.get(groupKey),
+                serverMap.get(groupKey));
+        if(result != null) {
+          assignments.putAll(result);
+        }
+      }
+    }
+    return assignments;
+  }
+
+  @Override
+  public Map<ServerName, List<HRegionInfo>> retainAssignment(
+      Map<HRegionInfo, ServerName> regions, List<ServerName> servers) throws HBaseIOException {
+    if (!isOnline()) {
+      return offlineRetainAssignment(regions, servers);
+    }
+    return onlineRetainAssignment(regions, servers);
+  }
+
+  public Map<ServerName, List<HRegionInfo>> offlineRetainAssignment(
+      Map<HRegionInfo, ServerName> regions, List<ServerName> servers) throws HBaseIOException {
+      //We will just keep assignments even if they are incorrect.
+      //Chances are most will be assigned correctly.
+      //Then we just use balance to correct the misplaced few.
+      //we need to correct catalog and group table assignment anyway.
+      return internalBalancer.retainAssignment(regions, servers);
+  }
+
+  public Map<ServerName, List<HRegionInfo>> onlineRetainAssignment(
+      Map<HRegionInfo, ServerName> regions, List<ServerName> servers) throws HBaseIOException {
+    try {
+      Map<ServerName, List<HRegionInfo>> assignments = new TreeMap<ServerName, List<HRegionInfo>>();
+      ListMultimap<String, HRegionInfo> groupToRegion = ArrayListMultimap.create();
+      List<HRegionInfo> misplacedRegions = getMisplacedRegions(regions);
+      for (HRegionInfo region : regions.keySet()) {
+        if (!misplacedRegions.contains(region)) {
+          String groupName = groupManager.getGroupOfTable(region.getTable());
+          groupToRegion.put(groupName, region);
+        }
+      }
+      // Now the "groupToRegion" map has only the regions which have correct
+      // assignments.
+      for (String key : groupToRegion.keySet()) {
+        Map<HRegionInfo, ServerName> currentAssignmentMap = new TreeMap<HRegionInfo, ServerName>();
+        List<HRegionInfo> regionList = groupToRegion.get(key);
+        GroupInfo info = groupManager.getGroup(key);
+        List<ServerName> candidateList = filterOfflineServers(info, servers);
+        for (HRegionInfo region : regionList) {
+          currentAssignmentMap.put(region, regions.get(region));
+        }
+        assignments.putAll(this.internalBalancer.retainAssignment(
+            currentAssignmentMap, candidateList));
+      }
+
+      for (HRegionInfo region : misplacedRegions) {
+        String groupName = groupManager.getGroupOfTable(
+            region.getTable());
+        GroupInfo info = groupManager.getGroup(groupName);
+        List<ServerName> candidateList = filterOfflineServers(info, servers);
+        ServerName server = this.internalBalancer.randomAssignment(region,
+            candidateList);
+        if (server != null && !assignments.containsKey(server)) {
+          assignments.put(server, new ArrayList<HRegionInfo>());
+        } else if (server != null) {
+          assignments.get(server).add(region);
+        } else {
+          //if not server is available assign to bogus so it ends up in RIT
+          if(!assignments.containsKey(BOGUS_SERVER_NAME)) {
+            assignments.put(BOGUS_SERVER_NAME, new ArrayList<HRegionInfo>());
+          }
+          assignments.get(BOGUS_SERVER_NAME).add(region);
+        }
+      }
+      return assignments;
+    } catch (IOException e) {
+      throw new HBaseIOException("Failed to do online retain assignment", e);
+    }
+  }
+
+  @Override
+  public Map<HRegionInfo, ServerName> immediateAssignment(
+      List<HRegionInfo> regions, List<ServerName> servers) throws HBaseIOException {
+    Map<HRegionInfo,ServerName> assignments = Maps.newHashMap();
+    ListMultimap<String,HRegionInfo> regionMap = LinkedListMultimap.create();
+    ListMultimap<String,ServerName> serverMap = LinkedListMultimap.create();
+    generateGroupMaps(regions, servers, regionMap, serverMap);
+    for(String groupKey : regionMap.keySet()) {
+      if (regionMap.get(groupKey).size() > 0) {
+        assignments.putAll(
+            this.internalBalancer.immediateAssignment(
+                regionMap.get(groupKey),
+                serverMap.get(groupKey)));
+      }
+    }
+    return assignments;
+  }
+
+  @Override
+  public ServerName randomAssignment(HRegionInfo region,
+      List<ServerName> servers) throws HBaseIOException {
+    ListMultimap<String,HRegionInfo> regionMap = LinkedListMultimap.create();
+    ListMultimap<String,ServerName> serverMap = LinkedListMultimap.create();
+    generateGroupMaps(Lists.newArrayList(region), servers, regionMap, serverMap);
+    List<ServerName> filteredServers = serverMap.get(regionMap.keySet().iterator().next());
+    return this.internalBalancer.randomAssignment(region, filteredServers);
+  }
+
+  private void generateGroupMaps(
+    List<HRegionInfo> regions,
+    List<ServerName> servers,
+    ListMultimap<String, HRegionInfo> regionMap,
+    ListMultimap<String, ServerName> serverMap) throws HBaseIOException {
+    try {
+      for (HRegionInfo region : regions) {
+        String groupName = groupManager.getGroupOfTable(region.getTable());
+        if(groupName == null) {
+          LOG.warn("Group for table "+region.getTable()+" is null");
+        }
+        regionMap.put(groupName, region);
+      }
+      for (String groupKey : regionMap.keySet()) {
+        GroupInfo info = groupManager.getGroup(groupKey);
+        serverMap.putAll(groupKey, filterOfflineServers(info, servers));
+        if(serverMap.get(groupKey).size() < 1) {
+          serverMap.put(groupKey, BOGUS_SERVER_NAME);
+        }
+      }
+    } catch(IOException e) {
+      throw new HBaseIOException("Failed to generate group maps", e);
+    }
+  }
+
+  private List<ServerName> filterOfflineServers(GroupInfo groupInfo,
+                                                List<ServerName> onlineServers) {
+    if (groupInfo != null) {
+      return filterServers(groupInfo.getServers(), onlineServers);
+    } else {
+      LOG.debug("Group Information found to be null. Some regions might be unassigned.");
+      return Collections.EMPTY_LIST;
+    }
+  }
+
+  /**
+   * Filter servers based on the online servers.
+   *
+   * @param servers
+   *          the servers
+   * @param onlineServers
+   *          List of servers which are online.
+   * @return the list
+   */
+  private List<ServerName> filterServers(Collection<HostPort> servers,
+      Collection<ServerName> onlineServers) {
+    ArrayList<ServerName> finalList = new ArrayList<ServerName>();
+    for (HostPort server : servers) {
+      for(ServerName curr: onlineServers) {
+        if(curr.getHostPort().equals(server)) {
+          finalList.add(curr);
+        }
+      }
+    }
+    return finalList;
+  }
+
+  private ListMultimap<String, HRegionInfo> groupRegions(
+      List<HRegionInfo> regionList) throws IOException {
+    ListMultimap<String, HRegionInfo> regionGroup = ArrayListMultimap
+        .create();
+    for (HRegionInfo region : regionList) {
+      String groupName = groupManager.getGroupOfTable(region.getTable());
+      regionGroup.put(groupName, region);
+    }
+    return regionGroup;
+  }
+
+  private List<HRegionInfo> getMisplacedRegions(
+      Map<HRegionInfo, ServerName> regions) throws IOException {
+    List<HRegionInfo> misplacedRegions = new ArrayList<HRegionInfo>();
+    for (HRegionInfo region : regions.keySet()) {
+      ServerName assignedServer = regions.get(region);
+      GroupInfo info = groupManager.getGroup(groupManager.getGroupOfTable(region.getTable()));
+      if (assignedServer != null &&
+          (info == null || !info.containsServer(assignedServer.getHostPort()))) {
+        LOG.warn("Found misplaced region: "+region.getRegionNameAsString()+
+            " on server: "+assignedServer+
+            " found in group: "+groupManager.getGroupOfServer(assignedServer.getHostPort())+
+            " outside of group: "+info.getName());
+        misplacedRegions.add(region);
+      }
+    }
+    return misplacedRegions;
+  }
+
+  private Map<ServerName, List<HRegionInfo>> correctAssignments(
+       Map<ServerName, List<HRegionInfo>> existingAssignments){
+    Map<ServerName, List<HRegionInfo>> correctAssignments = new TreeMap<ServerName, List<HRegionInfo>>();
+    List<HRegionInfo> misplacedRegions = new LinkedList<HRegionInfo>();
+    for (ServerName sName : existingAssignments.keySet()) {
+      correctAssignments.put(sName, new LinkedList<HRegionInfo>());
+      List<HRegionInfo> regions = existingAssignments.get(sName);
+      for (HRegionInfo region : regions) {
+        GroupInfo info = null;
+        try {
+          info = groupManager.getGroup(groupManager.getGroupOfTable(region.getTable()));
+        }catch(IOException exp){
+          LOG.debug("Group information null for region of table " + region.getTable(),
+              exp);
+        }
+        if ((info == null) || (!info.containsServer(sName.getHostPort()))) {
+          // Misplaced region.
+          misplacedRegions.add(region);
+        } else {
+          correctAssignments.get(sName).add(region);
+        }
+      }
+    }
+
+    //TODO bulk unassign?
+    //unassign misplaced regions, so that they are assigned to correct groups.
+    for(HRegionInfo info: misplacedRegions) {
+      this.masterServices.getAssignmentManager().unassign(info);
+    }
+    return correctAssignments;
+  }
+
+  @Override
+  public void initialize() throws HBaseIOException {
+    // Create the balancer
+    Class<? extends LoadBalancer> balancerKlass = config.getClass(
+        HBASE_GROUP_LOADBALANCER_CLASS,
+        StochasticLoadBalancer.class, LoadBalancer.class);
+    internalBalancer = ReflectionUtils.newInstance(balancerKlass, config);
+    internalBalancer.setClusterStatus(clusterStatus);
+    internalBalancer.setMasterServices(masterServices);
+    internalBalancer.setConf(config);
+    internalBalancer.initialize();
+  }
+
+  public boolean isOnline() {
+    return groupManager != null && groupManager.isOnline();
+  }
+
+  @InterfaceAudience.Private
+  public GroupInfoManager getGroupInfoManager() throws IOException {
+    return groupManager;
+  }
+
+  @Override
+  public void regionOnline(HRegionInfo regionInfo, ServerName sn) {
+  }
+
+  @Override
+  public void regionOffline(HRegionInfo regionInfo) {
+  }
+
+  @Override
+  public void onConfigurationChange(Configuration conf) {
+    //DO nothing for now
+  }
+
+  @Override
+  public void stop(String why) {
+  }
+
+  @Override
+  public boolean isStopped() {
+    return false;
+  }
+
+  @Override
+  public void setGroupInfoManager(GroupInfoManager groupInfoManager) throws IOException {
+    this.groupManager = groupInfoManager;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/16f65bad/hbase-server/src/main/java/org/apache/hadoop/hbase/group/GroupInfoManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/group/GroupInfoManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/group/GroupInfoManager.java
new file mode 100644
index 0000000..4ed7fa8
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/group/GroupInfoManager.java
@@ -0,0 +1,129 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.group;
+
+import org.apache.hadoop.hbase.HostPort;
+import org.apache.hadoop.hbase.NamespaceDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Interface used to manage GroupInfo storage. An implementation
+ * has the option to support offline mode.
+ * See {@link GroupBasedLoadBalancer}
+ */
+public interface GroupInfoManager {
+  //Assigned before user tables
+  public static final TableName GROUP_TABLE_NAME =
+      TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR,"rsgroup");
+  public static final byte[] GROUP_TABLE_NAME_BYTES = GROUP_TABLE_NAME.toBytes();
+  public static final String groupZNode = "groupInfo";
+  public static final byte[] META_FAMILY_BYTES = Bytes.toBytes("m");
+  public static final byte[] ROW_KEY = {0};
+
+
+  /**
+   * Adds the group.
+   *
+   * @param groupInfo the group name
+   * @throws java.io.IOException Signals that an I/O exception has occurred.
+   */
+  void addGroup(GroupInfo groupInfo) throws IOException;
+
+  /**
+   * Remove a region server group.
+   *
+   * @param groupName the group name
+   * @throws java.io.IOException Signals that an I/O exception has occurred.
+   */
+  void removeGroup(String groupName) throws IOException;
+
+  /**
+   * move servers to a new group.
+   * @param hostPorts list of servers, must be part of the same group
+   * @param srcGroup
+   * @param dstGroup
+   * @return true if move was successful
+   * @throws java.io.IOException
+   */
+  boolean moveServers(Set<HostPort> hostPorts, String srcGroup, String dstGroup) throws IOException;
+
+  /**
+   * Gets the group info of server.
+   *
+   * @param hostPort the server
+   * @return An instance of GroupInfo
+   */
+  GroupInfo getGroupOfServer(HostPort hostPort) throws IOException;
+
+  /**
+   * Gets the group information.
+   *
+   * @param groupName the group name
+   * @return An instance of GroupInfo
+   */
+  GroupInfo getGroup(String groupName) throws IOException;
+
+  /**
+   * Get the group membership of a table
+   * @param tableName
+   * @return Group name of table
+   * @throws java.io.IOException
+   */
+  String getGroupOfTable(TableName tableName) throws IOException;
+
+  /**
+   * Set the group membership of a set of tables
+   *
+   * @param tableNames
+   * @param groupName
+   * @throws java.io.IOException
+   */
+  void moveTables(Set<TableName> tableNames, String groupName) throws IOException;
+
+  /**
+   * List the groups
+   *
+   * @return list of GroupInfo
+   * @throws java.io.IOException
+   */
+  List<GroupInfo> listGroups() throws IOException;
+
+  /**
+   * Refresh/reload the group information from
+   * the persistent store
+   *
+   * @throws java.io.IOException
+   */
+  void refresh() throws IOException;
+
+  /**
+   * Whether the manager is able to fully
+   * return group metadata
+   *
+   * @return
+   */
+  boolean isOnline();
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/16f65bad/hbase-server/src/main/java/org/apache/hadoop/hbase/group/GroupInfoManagerImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/group/GroupInfoManagerImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/group/GroupInfoManagerImpl.java
new file mode 100644
index 0000000..a0df353
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/group/GroupInfoManagerImpl.java
@@ -0,0 +1,702 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.group;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import com.google.protobuf.ByteString;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.HostPort;
+import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.MetaTableAccessor.DefaultVisitorBase;
+import org.apache.hadoop.hbase.NamespaceDescriptor;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ClusterConnection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.RowMutations;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableState;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.ServerListener;
+import org.apache.hadoop.hbase.master.TableStateManager;
+import org.apache.hadoop.hbase.master.handler.CreateTableHandler;
+import org.apache.hadoop.hbase.protobuf.ProtobufMagic;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
+import org.apache.hadoop.hbase.protobuf.generated.RSGroupProtos;
+import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
+import org.apache.hadoop.hbase.security.access.AccessControlLists;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * This is an implementation of {@link GroupInfoManager}. Which makes
+ * use of an HBase table as the persistence store for the group information.
+ * It also makes use of zookeeper to store group information needed
+ * for bootstrapping during offline mode.
+ */
+public class GroupInfoManagerImpl implements GroupInfoManager, ServerListener {
+  private static final Log LOG = LogFactory.getLog(GroupInfoManagerImpl.class);
+
+  /** Table descriptor for <code>hbase:rsgroup</code> catalog table */
+  private final static HTableDescriptor GROUP_TABLE_DESC;
+  static {
+    GROUP_TABLE_DESC = new HTableDescriptor(GROUP_TABLE_NAME_BYTES);
+    GROUP_TABLE_DESC.addFamily(new HColumnDescriptor(META_FAMILY_BYTES));
+    GROUP_TABLE_DESC.setRegionSplitPolicyClassName(DisabledRegionSplitPolicy.class.getName());
+  }
+
+  //Access to this map should always be synchronized.
+  private volatile Map<String, GroupInfo> groupMap;
+  private volatile Map<TableName, String> tableMap;
+  private MasterServices master;
+  private Table groupTable;
+  private ClusterConnection conn;
+  private ZooKeeperWatcher watcher;
+  private GroupStartupWorker groupStartupWorker;
+  //contains list of groups that were last flushed to persistent store
+  private volatile Set<String> prevGroups;
+  private GroupSerDe groupSerDe;
+  private DefaultServerUpdater defaultServerUpdater;
+
+
+  public GroupInfoManagerImpl(MasterServices master) throws IOException {
+    this.groupMap = Collections.EMPTY_MAP;
+    this.tableMap = Collections.EMPTY_MAP;
+    groupSerDe = new GroupSerDe();
+    this.master = master;
+    this.watcher = master.getZooKeeper();
+    this.conn = master.getConnection();
+    groupStartupWorker = new GroupStartupWorker(this, master, conn);
+    prevGroups = new HashSet<String>();
+    refresh();
+    groupStartupWorker.start();
+    defaultServerUpdater = new DefaultServerUpdater(this);
+    master.getServerManager().registerListener(this);
+    defaultServerUpdater.start();
+  }
+
+  /**
+   * Adds the group.
+   *
+   * @param groupInfo the group name
+   */
+  @Override
+  public synchronized void addGroup(GroupInfo groupInfo) throws IOException {
+    if (groupMap.get(groupInfo.getName()) != null ||
+        groupInfo.getName().equals(GroupInfo.DEFAULT_GROUP)) {
+      throw new DoNotRetryIOException("Group already exists: "+groupInfo.getName());
+    }
+    Map<String, GroupInfo> newGroupMap = Maps.newHashMap(groupMap);
+    newGroupMap.put(groupInfo.getName(), groupInfo);
+    flushConfig(newGroupMap);
+  }
+
+  @Override
+  public synchronized boolean moveServers(Set<HostPort> hostPorts, String srcGroup,
+                                          String dstGroup) throws IOException {
+    GroupInfo src = new GroupInfo(getGroup(srcGroup));
+    GroupInfo dst = new GroupInfo(getGroup(dstGroup));
+    boolean foundOne = false;
+    for(HostPort el: hostPorts) {
+      foundOne = src.removeServer(el) || foundOne;
+      dst.addServer(el);
+    }
+
+    Map<String,GroupInfo> newGroupMap = Maps.newHashMap(groupMap);
+    newGroupMap.put(src.getName(), src);
+    newGroupMap.put(dst.getName(), dst);
+
+    flushConfig(newGroupMap);
+    return foundOne;
+  }
+
+  /**
+   * Gets the group info of server.
+   *
+   * @param hostPort the server
+   * @return An instance of GroupInfo.
+   */
+  @Override
+  public GroupInfo getGroupOfServer(HostPort hostPort) throws IOException {
+    for (GroupInfo info : groupMap.values()) {
+      if (info.containsServer(hostPort)){
+        return info;
+      }
+    }
+    return getGroup(GroupInfo.DEFAULT_GROUP);
+  }
+
+  /**
+   * Gets the group information.
+   *
+   * @param groupName
+   *          the group name
+   * @return An instance of GroupInfo
+   */
+  @Override
+  public GroupInfo getGroup(String groupName) throws IOException {
+    GroupInfo groupInfo = groupMap.get(groupName);
+    return groupInfo;
+  }
+
+
+
+  @Override
+  public String getGroupOfTable(TableName tableName) throws IOException {
+    return tableMap.get(tableName);
+  }
+
+  @Override
+  public synchronized void moveTables(Set<TableName> tableNames, String groupName) throws IOException {
+    if (groupName != null && !groupMap.containsKey(groupName)) {
+      throw new DoNotRetryIOException("Group "+groupName+" does not exist or is a special group");
+    }
+    Map<String,GroupInfo> newGroupMap = Maps.newHashMap(groupMap);
+    for(TableName tableName: tableNames) {
+      if (tableMap.containsKey(tableName)) {
+        GroupInfo src = new GroupInfo(groupMap.get(tableMap.get(tableName)));
+        src.removeTable(tableName);
+        newGroupMap.put(src.getName(), src);
+      }
+      if(groupName != null) {
+        GroupInfo dst = new GroupInfo(newGroupMap.get(groupName));
+        dst.addTable(tableName);
+        newGroupMap.put(dst.getName(), dst);
+      }
+    }
+
+    flushConfig(newGroupMap);
+  }
+
+
+  /**
+   * Delete a region server group.
+   *
+   * @param groupName the group name
+   * @throws java.io.IOException Signals that an I/O exception has occurred.
+   */
+  @Override
+  public synchronized void removeGroup(String groupName) throws IOException {
+    if (!groupMap.containsKey(groupName) || groupName.equals(GroupInfo.DEFAULT_GROUP)) {
+      throw new DoNotRetryIOException("Group "+groupName+" does not exist or is a reserved group");
+    }
+    Map<String,GroupInfo> newGroupMap = Maps.newHashMap(groupMap);
+    newGroupMap.remove(groupName);
+    flushConfig(newGroupMap);
+  }
+
+  @Override
+  public List<GroupInfo> listGroups() throws IOException {
+    List<GroupInfo> list = Lists.newLinkedList(groupMap.values());
+    return list;
+  }
+
+  @Override
+  public boolean isOnline() {
+    return groupStartupWorker.isOnline();
+  }
+
+  @Override
+  public synchronized void refresh() throws IOException {
+    refresh(false);
+  }
+
+  private synchronized void refresh(boolean forceOnline) throws IOException {
+    List<GroupInfo> groupList = new LinkedList<GroupInfo>();
+
+    //overwrite anything read from zk, group table is source of truth
+    //if online read from GROUP table
+    if (forceOnline || isOnline()) {
+      LOG.debug("Refreshing in Online mode.");
+      if (groupTable == null) {
+        groupTable = conn.getTable(GROUP_TABLE_NAME);
+      }
+      groupList.addAll(groupSerDe.retrieveGroupList(groupTable));
+    } else {
+      LOG.debug("Refershing in Offline mode.");
+      String groupBasePath = ZKUtil.joinZNode(watcher.baseZNode, groupZNode);
+      groupList.addAll(groupSerDe.retrieveGroupList(watcher, groupBasePath));
+    }
+
+    //refresh default group, prune
+    NavigableSet<TableName> orphanTables = new TreeSet<TableName>();
+    for(String entry: master.getTableDescriptors().getAll().keySet()) {
+      orphanTables.add(TableName.valueOf(entry));
+    }
+
+    List<TableName> specialTables;
+    if(!master.isInitialized()) {
+      specialTables = new ArrayList<TableName>();
+      specialTables.add(AccessControlLists.ACL_TABLE_NAME);
+      specialTables.add(TableName.META_TABLE_NAME);
+      specialTables.add(TableName.NAMESPACE_TABLE_NAME);
+      specialTables.add(GroupInfoManager.GROUP_TABLE_NAME);
+    } else {
+      specialTables =
+          master.listTableNamesByNamespace(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR);
+    }
+
+    for(TableName table : specialTables) {
+      orphanTables.add(table);
+    }
+    for(GroupInfo group: groupList) {
+      if(!group.getName().equals(GroupInfo.DEFAULT_GROUP)) {
+        orphanTables.removeAll(group.getTables());
+      }
+    }
+
+    //This is added to the last of the list
+    //so it overwrites the default group loaded
+    //from region group table or zk
+    groupList.add(new GroupInfo(GroupInfo.DEFAULT_GROUP,
+        new TreeSet<HostPort>(getDefaultServers()),
+        orphanTables));
+
+
+    //populate the data
+    HashMap<String, GroupInfo> newGroupMap = Maps.newHashMap();
+    HashMap<TableName, String> newTableMap = Maps.newHashMap();
+    for (GroupInfo group : groupList) {
+      newGroupMap.put(group.getName(), group);
+      for(TableName table: group.getTables()) {
+        newTableMap.put(table, group.getName());
+      }
+    }
+    groupMap = Collections.unmodifiableMap(newGroupMap);
+    tableMap = Collections.unmodifiableMap(newTableMap);
+
+    prevGroups.clear();
+    prevGroups.addAll(groupMap.keySet());
+  }
+
+  private synchronized Map<TableName,String> flushConfigTable(Map<String,GroupInfo> newGroupMap) throws IOException {
+    Map<TableName,String> newTableMap = Maps.newHashMap();
+    Put put = new Put(ROW_KEY);
+    Delete delete = new Delete(ROW_KEY);
+
+    //populate deletes
+    for(String groupName : prevGroups) {
+      if(!newGroupMap.containsKey(groupName)) {
+        delete.deleteColumns(META_FAMILY_BYTES, Bytes.toBytes(groupName));
+      }
+    }
+
+    //populate puts
+    for(GroupInfo groupInfo : newGroupMap.values()) {
+      RSGroupProtos.GroupInfo proto = ProtobufUtil.toProtoGroupInfo(groupInfo);
+      put.add(META_FAMILY_BYTES,
+          Bytes.toBytes(groupInfo.getName()),
+          proto.toByteArray());
+      for(TableName entry: groupInfo.getTables()) {
+        newTableMap.put(entry, groupInfo.getName());
+      }
+    }
+
+    RowMutations rowMutations = new RowMutations(ROW_KEY);
+    if(put.size() > 0) {
+      rowMutations.add(put);
+    }
+    if(delete.size() > 0) {
+      rowMutations.add(delete);
+    }
+    if(rowMutations.getMutations().size() > 0) {
+      groupTable.mutateRow(rowMutations);
+    }
+    return newTableMap;
+  }
+
+  private synchronized void flushConfig(Map<String,GroupInfo> newGroupMap) throws IOException {
+    Map<TableName, String> newTableMap;
+    //this should only not enter during startup
+    if(!isOnline()) {
+      LOG.error("Still in Offline mode.");
+      throw new IOException("Still in Offline mode.");
+    }
+
+    newTableMap = flushConfigTable(newGroupMap);
+
+    //make changes visible since it has been
+    //persisted in the source of truth
+    groupMap = Collections.unmodifiableMap(newGroupMap);
+    tableMap = Collections.unmodifiableMap(newTableMap);
+
+
+    try {
+      //Write zk data first since that's what we'll read first
+      String groupBasePath = ZKUtil.joinZNode(watcher.baseZNode, groupZNode);
+      ZKUtil.createAndFailSilent(watcher, groupBasePath, ProtobufMagic.PB_MAGIC);
+
+      List<ZKUtil.ZKUtilOp> zkOps = new ArrayList<ZKUtil.ZKUtilOp>(newGroupMap.size());
+      for(String groupName : prevGroups) {
+        if(!newGroupMap.containsKey(groupName)) {
+          String znode = ZKUtil.joinZNode(groupBasePath, groupName);
+          zkOps.add(ZKUtil.ZKUtilOp.deleteNodeFailSilent(znode));
+        }
+      }
+
+
+      for(GroupInfo groupInfo : newGroupMap.values()) {
+        String znode = ZKUtil.joinZNode(groupBasePath, groupInfo.getName());
+        RSGroupProtos.GroupInfo proto = ProtobufUtil.toProtoGroupInfo(groupInfo);
+        LOG.debug("Updating znode: "+znode);
+        ZKUtil.createAndFailSilent(watcher, znode);
+        zkOps.add(ZKUtil.ZKUtilOp.deleteNodeFailSilent(znode));
+        zkOps.add(ZKUtil.ZKUtilOp.createAndFailSilent(znode,
+            ProtobufUtil.prependPBMagic(proto.toByteArray())));
+      }
+      LOG.debug("Writing ZK GroupInfo count: " + zkOps.size());
+
+      ZKUtil.multiOrSequential(watcher, zkOps, false);
+    } catch (KeeperException e) {
+      LOG.error("Failed to write to groupZNode", e);
+      master.abort("Failed to write to groupZNode", e);
+      throw new IOException("Failed to write to groupZNode",e);
+    }
+
+    prevGroups.clear();
+    prevGroups.addAll(newGroupMap.keySet());
+  }
+
+  private List<ServerName> getOnlineRS() throws IOException {
+    if (master != null) {
+      return master.getServerManager().getOnlineServersList();
+    }
+    try {
+      LOG.debug("Reading online RS from zookeeper");
+      List<ServerName> servers = new LinkedList<ServerName>();
+      for (String el: ZKUtil.listChildrenNoWatch(watcher, watcher.rsZNode)) {
+        servers.add(ServerName.parseServerName(el));
+      }
+      return servers;
+    } catch (KeeperException e) {
+      throw new IOException("Failed to retrieve server list from zookeeper", e);
+    }
+  }
+
+  private List<HostPort> getDefaultServers() throws IOException {
+    List<HostPort> defaultServers = new LinkedList<HostPort>();
+    for(ServerName server : getOnlineRS()) {
+      HostPort hostPort = new HostPort(server.getHostname(), server.getPort());
+      boolean found = false;
+      for(GroupInfo groupInfo : groupMap.values()) {
+        if(!GroupInfo.DEFAULT_GROUP.equals(groupInfo.getName()) &&
+            groupInfo.containsServer(hostPort)) {
+          found = true;
+          break;
+        }
+      }
+      if(!found) {
+        defaultServers.add(hostPort);
+      }
+    }
+    return defaultServers;
+  }
+
+  private synchronized void updateDefaultServers(
+      NavigableSet<HostPort> hostPort) throws IOException {
+    if(!isOnline()) {
+      LOG.info("Offline mode. Skipping update of default servers");
+      return;
+    }
+    GroupInfo info = groupMap.get(GroupInfo.DEFAULT_GROUP);
+    GroupInfo newInfo = new GroupInfo(info.getName(), hostPort, info.getTables());
+    HashMap<String, GroupInfo> newGroupMap = Maps.newHashMap(groupMap);
+    newGroupMap.put(newInfo.getName(), newInfo);
+    flushConfig(newGroupMap);
+  }
+
+  @Override
+  public void serverAdded(ServerName serverName) {
+    defaultServerUpdater.serverChanged();
+  }
+
+  @Override
+  public void serverRemoved(ServerName serverName) {
+    defaultServerUpdater.serverChanged();
+  }
+
+  private static class DefaultServerUpdater extends Thread {
+    private static final Log LOG = LogFactory.getLog(DefaultServerUpdater.class);
+    private GroupInfoManagerImpl mgr;
+    private boolean hasChanged = false;
+
+    public DefaultServerUpdater(GroupInfoManagerImpl mgr) {
+      this.mgr = mgr;
+    }
+
+    public void run() {
+      List<HostPort> prevDefaultServers = new LinkedList<HostPort>();
+      while(!mgr.master.isAborted() || !mgr.master.isStopped()) {
+        try {
+          LOG.info("Updating default servers.");
+          List<HostPort> servers = mgr.getDefaultServers();
+          Collections.sort(servers);
+          if(!servers.equals(prevDefaultServers)) {
+            mgr.updateDefaultServers(new TreeSet<HostPort>(servers));
+            prevDefaultServers = servers;
+            LOG.info("Updated with servers: "+servers.size());
+          }
+          try {
+            synchronized (this) {
+              if(!hasChanged) {
+                wait();
+              }
+              hasChanged = false;
+            }
+          } catch (InterruptedException e) {
+          }
+        } catch (IOException e) {
+          LOG.warn("Failed to update default servers", e);
+        }
+      }
+    }
+
+    public void serverChanged() {
+      synchronized (this) {
+        hasChanged = true;
+        this.notify();
+      }
+    }
+  }
+
+
+  private static class GroupStartupWorker extends Thread {
+    private static final Log LOG = LogFactory.getLog(GroupStartupWorker.class);
+
+    private Configuration conf;
+    private volatile boolean isOnline = false;
+    private MasterServices masterServices;
+    private GroupInfoManagerImpl groupInfoManager;
+    private ClusterConnection conn;
+
+    public GroupStartupWorker(GroupInfoManagerImpl groupInfoManager,
+                              MasterServices masterServices,
+                              ClusterConnection conn) {
+      this.conf = masterServices.getConfiguration();
+      this.masterServices = masterServices;
+      this.groupInfoManager = groupInfoManager;
+      this.conn = conn;
+      setName(GroupStartupWorker.class.getName()+"-"+masterServices.getServerName());
+      setDaemon(true);
+    }
+
+    @Override
+    public void run() {
+      if(waitForGroupTableOnline()) {
+        LOG.info("GroupBasedLoadBalancer is now online");
+      }
+    }
+
+    public boolean waitForGroupTableOnline() {
+      final List<HRegionInfo> foundRegions = new LinkedList<HRegionInfo>();
+      final List<HRegionInfo> assignedRegions = new LinkedList<HRegionInfo>();
+      final AtomicBoolean found = new AtomicBoolean(false);
+      final TableStateManager tsm = masterServices.getTableStateManager();
+      boolean createSent = false;
+      while (!found.get() && isMasterRunning()) {
+        foundRegions.clear();
+        assignedRegions.clear();
+        found.set(true);
+        try {
+          final Table nsTable = conn.getTable(TableName.NAMESPACE_TABLE_NAME);
+          final Table groupTable = conn.getTable(GroupInfoManager.GROUP_TABLE_NAME);
+          boolean rootMetaFound =
+              masterServices.getMetaTableLocator().verifyMetaRegionLocation(
+                  conn,
+                  masterServices.getZooKeeper(),
+                  1);
+          final AtomicBoolean nsFound = new AtomicBoolean(false);
+          if (rootMetaFound) {
+
+            MetaTableAccessor.Visitor visitor = new DefaultVisitorBase() {
+              @Override
+              public boolean visitInternal(Result row) throws IOException {
+                HRegionInfo info = HRegionInfo.getHRegionInfo(row);
+                if (info != null) {
+                  Cell serverCell =
+                      row.getColumnLatestCell(HConstants.CATALOG_FAMILY,
+                          HConstants.SERVER_QUALIFIER);
+                  if (GROUP_TABLE_NAME.equals(info.getTable()) && serverCell != null) {
+                    ServerName sn =
+                        ServerName.parseVersionedServerName(CellUtil.cloneValue(serverCell));
+                    if (sn == null) {
+                      found.set(false);
+                    } else if (tsm.isTableState(GROUP_TABLE_NAME, TableState.State.ENABLED)) {
+                      try {
+                        HBaseProtos.RegionSpecifier regionSpecifier =
+                            HBaseProtos.RegionSpecifier.newBuilder()
+                                .setValue(ByteString.copyFrom(row.getRow()))
+                                .setType(
+                                    HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME)
+                                .build();
+                        ClientProtos.ClientService.BlockingInterface rs = conn.getClient(sn);
+                        ClientProtos.GetRequest req =
+                            ClientProtos.GetRequest.newBuilder()
+                                .setRegion(regionSpecifier)
+                                .setGet(ProtobufUtil.toGet(new Get(ROW_KEY))).build();
+                        rs.get(null, req);
+                        assignedRegions.add(info);
+                      } catch(Exception ex) {
+                        LOG.debug("Caught exception while verifying group region", ex);
+                      }
+                    }
+                    foundRegions.add(info);
+                  }
+                  if (TableName.NAMESPACE_TABLE_NAME.equals(info.getTable())) {
+                    Cell cell = row.getColumnLatestCell(HConstants.CATALOG_FAMILY,
+                        HConstants.SERVER_QUALIFIER);
+                    ServerName sn = null;
+                    if(cell != null) {
+                      sn = ServerName.parseVersionedServerName(CellUtil.cloneValue(cell));
+                    }
+                    if (tsm.isTableState(TableName.NAMESPACE_TABLE_NAME,
+                        TableState.State.ENABLED)) {
+                      try {
+                        HBaseProtos.RegionSpecifier regionSpecifier =
+                            HBaseProtos.RegionSpecifier.newBuilder()
+                                .setValue(ByteString.copyFrom(row.getRow()))
+                                .setType(
+                                    HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME)
+                                .build();
+                        ClientProtos.ClientService.BlockingInterface rs = conn.getClient(sn);
+                        ClientProtos.GetRequest req =
+                            ClientProtos.GetRequest.newBuilder()
+                                .setRegion(regionSpecifier)
+                                .setGet(ProtobufUtil.toGet(new Get(ROW_KEY))).build();
+                        rs.get(null, req);
+                        nsFound.set(true);
+                      } catch(Exception ex) {
+                        LOG.debug("Caught exception while verifying group region", ex);
+                      }
+                    }
+                  }
+                }
+                return true;
+              }
+            };
+            MetaTableAccessor.fullScanRegions(conn, visitor);
+            // if no regions in meta then we have to create the table
+            if (foundRegions.size() < 1 && rootMetaFound && !createSent && nsFound.get()) {
+              groupInfoManager.createGroupTable(masterServices);
+              createSent = true;
+            }
+            LOG.info("Group table: " + GROUP_TABLE_NAME + " isOnline: " + found.get()
+                + ", regionCount: " + foundRegions.size() + ", assignCount: "
+                + assignedRegions.size() + ", rootMetaFound: "+rootMetaFound);
+            found.set(found.get() && assignedRegions.size() == foundRegions.size()
+                && foundRegions.size() > 0);
+          } else {
+            LOG.info("Waiting for catalog tables to come online");
+            found.set(false);
+          }
+          if (found.get()) {
+            LOG.debug("With group table online, refreshing cached information.");
+            groupInfoManager.refresh(true);
+            isOnline = true;
+            //flush any inconsistencies between ZK and HTable
+            groupInfoManager.flushConfig(groupInfoManager.groupMap);
+          }
+        } catch(Exception e) {
+          found.set(false);
+          LOG.warn("Failed to perform check", e);
+        }
+        try {
+          Thread.sleep(100);
+        } catch (InterruptedException e) {
+          LOG.info("Sleep interrupted", e);
+        }
+      }
+      return found.get();
+    }
+
+    public boolean isOnline() {
+      return isOnline;
+    }
+
+    private boolean isMasterRunning() {
+      return !masterServices.isAborted() && !masterServices.isStopped();
+    }
+  }
+
+  private void createGroupTable(MasterServices masterServices) throws IOException {
+    HRegionInfo newRegions[] = new HRegionInfo[]{
+        new HRegionInfo(GROUP_TABLE_DESC.getTableName(), null, null)};
+    //we need to create the table this way to bypass
+    //checkInitialized
+    masterServices.getExecutorService()
+        .submit(new CreateTableHandler(
+            masterServices,
+            masterServices.getMasterFileSystem(),
+            GROUP_TABLE_DESC,
+            masterServices.getConfiguration(),
+            newRegions,
+            masterServices).prepare());
+    //wait for region to be online
+    int tries = 600;
+    while(masterServices.getAssignmentManager().getRegionStates()
+        .getRegionServerOfRegion(newRegions[0]) == null && tries > 0) {
+      try {
+        Thread.sleep(100);
+      } catch (InterruptedException e) {
+        throw new IOException("Wait interrupted", e);
+      }
+      tries--;
+    }
+    if(tries <= 0) {
+      throw new IOException("Failed to create group table.");
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/16f65bad/hbase-server/src/main/java/org/apache/hadoop/hbase/group/GroupSerDe.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/group/GroupSerDe.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/group/GroupSerDe.java
new file mode 100644
index 0000000..b0262ef
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/group/GroupSerDe.java
@@ -0,0 +1,97 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.group;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
+import org.apache.hadoop.hbase.protobuf.generated.RSGroupProtos;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.type.TypeReference;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.NavigableMap;
+
+//TODO do better encapsulation of SerDe logic from GroupInfoManager and GroupTracker
+public class GroupSerDe {
+  private static final Log LOG = LogFactory.getLog(GroupSerDe.class);
+
+  public GroupSerDe() {
+
+  }
+
+  public List<GroupInfo> retrieveGroupList(Table groupTable) throws IOException {
+    List<GroupInfo> groupInfoList = Lists.newArrayList();
+    Result result = groupTable.get(new Get(GroupInfoManager.ROW_KEY));
+    if(!result.isEmpty()) {
+      NavigableMap<byte[],NavigableMap<byte[],byte[]>> dataMap = result.getNoVersionMap();
+      for(byte[] groupName: dataMap.get(GroupInfoManager.META_FAMILY_BYTES).keySet()) {
+        RSGroupProtos.GroupInfo proto =
+            RSGroupProtos.GroupInfo.parseFrom(
+                dataMap.get(GroupInfoManager.META_FAMILY_BYTES).get(groupName));
+        groupInfoList.add(ProtobufUtil.toGroupInfo(proto));
+      }
+    }
+    return groupInfoList;
+  }
+
+  public List<GroupInfo> retrieveGroupList(ZooKeeperWatcher watcher,
+                                           String groupBasePath) throws IOException {
+    List<GroupInfo> groupInfoList = Lists.newArrayList();
+    //Overwrite any info stored by table, this takes precedence
+    try {
+      if(ZKUtil.checkExists(watcher, groupBasePath) != -1) {
+        for(String znode: ZKUtil.listChildrenAndWatchForNewChildren(watcher, groupBasePath)) {
+          byte[] data = ZKUtil.getData(watcher, ZKUtil.joinZNode(groupBasePath, znode));
+          if(data.length > 0) {
+            ProtobufUtil.expectPBMagicPrefix(data);
+            ByteArrayInputStream bis = new ByteArrayInputStream(
+                data, ProtobufUtil.lengthOfPBMagic(), data.length);
+            groupInfoList.add(ProtobufUtil.toGroupInfo(RSGroupProtos.GroupInfo.parseFrom(bis)));
+          }
+        }
+        LOG.debug("Read ZK GroupInfo count:" + groupInfoList.size());
+      }
+    } catch (KeeperException e) {
+      throw new IOException("Failed to read groupZNode",e);
+    } catch (DeserializationException e) {
+      throw new IOException("Failed to read groupZNode",e);
+    } catch (InterruptedException e) {
+      throw new IOException("Failed to read groupZNode",e);
+    }
+    return groupInfoList;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/16f65bad/hbase-server/src/main/java/org/apache/hadoop/hbase/group/GroupTracker.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/group/GroupTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/group/GroupTracker.java
new file mode 100644
index 0000000..f0c0a8f
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/group/GroupTracker.java
@@ -0,0 +1,341 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.group;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HostPort;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class GroupTracker extends ZooKeeperNodeTracker {
+  private static final Log LOG = LogFactory.getLog(GroupTracker.class);
+
+  private List<Listener> listeners = Collections.synchronizedList(new ArrayList<Listener>());
+  private GroupSerDe groupSerDe = new GroupSerDe();
+  private volatile Map<String, GroupInfo> groupMap = new HashMap<String, GroupInfo>();
+  private volatile Map<HostPort, GroupInfo> serverMap = new HashMap<HostPort, GroupInfo>();
+  private RegionServerTracker rsTracker;
+  private volatile boolean started = false;
+
+  /**
+   * Constructs a new ZK node tracker.
+   * <p/>
+   * <p>After construction, use {@link #start} to kick off tracking.
+   *
+   * @param watcher
+   * @param abortable
+   */
+  public GroupTracker(ZooKeeperWatcher watcher, Abortable abortable) throws IOException {
+    //TODO make period configurable
+    super(watcher,
+        ZKUtil.joinZNode(watcher.baseZNode, GroupInfoManager.groupZNode),
+        abortable!=null?abortable:new PersistentAbortable(10000));
+    if(abortable == null) {
+      ((PersistentAbortable)this.abortable).setGroupTracker(this);
+    }
+    rsTracker = new RegionServerTracker(watcher, abortable, this);
+    try {
+      ZKUtil.listChildrenAndWatchThem(watcher, node);
+      rsTracker.start();
+    } catch (KeeperException e) {
+      throw new IOException("Failed to start RS tracker", e);
+    }
+  }
+
+  public void addListener(Listener listener) {
+    listeners.add(listener);
+  }
+
+  public void removeListener(Listener listener) {
+    listeners.remove(listener);
+  }
+
+  @Override
+  public synchronized void start() {
+    super.start();
+    started = true;
+  }
+
+  @Override
+  public void nodeCreated(String path) {
+    if (path.equals(node)) {
+      refresh();
+    }
+  }
+
+  @Override
+  public void nodeDataChanged(String path) {
+    if (path.equals(node)) {
+      nodeCreated(path);
+    }
+  }
+
+  @Override
+  public void nodeChildrenChanged(String path) {
+    if (path.startsWith(node)) {
+      refresh();
+    }
+  }
+
+  public void blockUntilReady(int timeout) throws InterruptedException, IOException {
+    blockUntilAvailable(timeout, false);
+    if(getData(false) != null) {
+      refresh(false);
+    }
+  }
+
+  private void refresh() {
+    try {
+      refresh(false);
+    } catch (IOException e) {
+      this.abortable.abort("Failed to read group znode", e);
+    }
+  }
+
+  private synchronized void refresh(boolean force) throws IOException {
+    List<ServerName> onlineRS = rsTracker.getOnlineServers();
+    Set<HostPort> hostPorts = new HashSet<HostPort>();
+    for(ServerName entry: onlineRS) {
+      hostPorts.add(new HostPort(entry.getHostname(), entry.getPort()));
+    }
+    Map<String, GroupInfo> tmpGroupMap = new HashMap<String, GroupInfo>();
+    Map<HostPort, GroupInfo> tmpServerMap = new HashMap<HostPort, GroupInfo>();
+    for(GroupInfo groupInfo: listGroups()) {
+      tmpGroupMap.put(groupInfo.getName(), groupInfo);
+      for(HostPort server: groupInfo.getServers()) {
+        tmpServerMap.put(server, groupInfo);
+        hostPorts.remove(server);
+      }
+    }
+    GroupInfo groupInfo = tmpGroupMap.get(GroupInfo.DEFAULT_GROUP);
+    groupInfo.addAllServers(hostPorts);
+    for(HostPort entry: hostPorts) {
+      tmpServerMap.put(entry, groupInfo);
+    }
+
+    //when reading sync on "this" if groupMap<->serverMap
+    //invariant needs to be guaranteed
+    groupMap = tmpGroupMap;
+    serverMap = tmpServerMap;
+
+    Map<String, GroupInfo> map = getGroupMap();
+    for(Listener listener : listeners) {
+      listener.groupMapChanged(map);
+    }
+  }
+
+  private List<GroupInfo> listGroups() throws IOException {
+    return groupSerDe.retrieveGroupList(watcher, node);
+  }
+
+  public GroupInfo getGroup(String name) {
+    GroupInfo groupInfo = groupMap.get(name);
+    return groupInfo;
+  }
+
+  public GroupInfo getGroupOfServer(String hostPort) {
+    GroupInfo groupInfo = serverMap.get(hostPort);
+    return groupInfo;
+  }
+
+  public Map<String, GroupInfo> getGroupMap() {
+    return Collections.unmodifiableMap(groupMap);
+  }
+
+  public interface Listener {
+    public void groupMapChanged(Map<String, GroupInfo> groupMap);
+  }
+
+
+  /**
+   * This class is copied for RegionServerTracker
+   * We need our own since the other one was tied to ServerManager
+   * and thus the master
+   */
+  private static class RegionServerTracker extends ZooKeeperListener {
+    private static final Log LOG = LogFactory.getLog(RegionServerTracker.class);
+    private volatile List<ServerName> regionServers = new ArrayList<ServerName>();
+    private Abortable abortable;
+    private GroupTracker groupTracker;
+
+    public RegionServerTracker(ZooKeeperWatcher watcher,
+        Abortable abortable, GroupTracker groupTracker) {
+      super(watcher);
+      this.abortable = abortable;
+      this.groupTracker = groupTracker;
+    }
+
+    public void start() throws KeeperException, IOException {
+      watcher.registerListener(this);
+      refresh();
+    }
+
+    private void add(final List<String> servers) throws IOException {
+      List<ServerName> temp = new ArrayList<ServerName>();
+      for (String n: servers) {
+        ServerName sn = ServerName.parseServerName(ZKUtil.getNodeName(n));
+        temp.add(sn);
+      }
+      regionServers = temp;
+      //we're refreshing groups, since default membership
+      //is dynamic and new servers may end up as new default group members
+      refreshGroups();
+    }
+
+    private void remove(final ServerName sn) {
+      List<ServerName> temp = new ArrayList<ServerName>();
+      for(ServerName el: regionServers) {
+        if(!sn.equals(el)) {
+          temp.add(el);
+        }
+      }
+      regionServers = temp;
+      refreshGroups();
+    }
+
+    private void refreshGroups() {
+      if(groupTracker.started && groupTracker.getData(false) != null) {
+        groupTracker.refresh();
+      }
+    }
+
+    public void refresh() throws KeeperException, IOException {
+      List<String> servers =
+        ZKUtil.listChildrenAndWatchThem(watcher, watcher.rsZNode);
+      add(servers);
+    }
+
+    @Override
+    public void nodeDeleted(String path) {
+      if (path.startsWith(watcher.rsZNode)) {
+        String serverName = ZKUtil.getNodeName(path);
+        LOG.info("RegionServer ephemeral node deleted, processing expiration [" +
+          serverName + "]");
+        ServerName sn = ServerName.parseServerName(serverName);
+        remove(sn);
+      }
+    }
+
+    @Override
+    public void nodeChildrenChanged(String path) {
+      if (path.equals(watcher.rsZNode)) {
+        try {
+          List<String> servers =
+            ZKUtil.listChildrenAndWatchThem(watcher, watcher.rsZNode);
+          add(servers);
+        } catch (IOException e) {
+          abortable.abort("Unexpected zk exception getting RS nodes", e);
+        } catch (KeeperException e) {
+          abortable.abort("Unexpected zk exception getting RS nodes", e);
+        }
+      }
+    }
+
+    /**
+     * Gets the online servers.
+     * @return list of online servers
+     */
+    public List<ServerName> getOnlineServers() {
+      return regionServers;
+    }
+  }
+
+  private static class Refresher extends Thread {
+    private final static Log LOG = LogFactory.getLog(Refresher.class);
+    private GroupTracker groupTracker;
+    private volatile boolean isRunning = true;
+    private int period;
+
+    public Refresher(GroupTracker groupTracker, int period) {
+      this.groupTracker = groupTracker;
+      this.period = period;
+      this.setDaemon(true);
+    }
+
+    public boolean isRunning() {
+      return isRunning;
+    }
+
+    @Override
+    public void run() {
+      while(true) {
+        try {
+          groupTracker.rsTracker.refresh();
+          groupTracker.refresh(true);
+          LOG.info("Recovery refresh successful");
+          isRunning = false;
+          return;
+        } catch (IOException e) {
+          LOG.warn("Failed to refresh", e);
+        } catch (KeeperException e) {
+          LOG.warn("Failed to refresh", e);
+        }
+        try {
+          Thread.sleep(period);
+        } catch (InterruptedException e) {
+        }
+      }
+    }
+  }
+
+  private static class PersistentAbortable implements Abortable {
+    private final Log LOG = LogFactory.getLog(Abortable.class);
+    private Refresher refresher;
+    private GroupTracker groupTracker;
+    private int period;
+
+
+    public PersistentAbortable(int period) {
+      this.period = period;
+    }
+
+    public void setGroupTracker(GroupTracker groupTracker) {
+      this.groupTracker = groupTracker;
+    }
+
+    @Override
+    public void abort(String why, Throwable e) {
+      LOG.warn("Launching referesher because of abort: "+why, e);
+      if(refresher == null || !refresher.isRunning()) {
+        refresher = new Refresher(groupTracker, period);
+      }
+    }
+
+    @Override
+    public boolean isAborted() {
+      return false;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/16f65bad/hbase-server/src/main/java/org/apache/hadoop/hbase/group/GroupableBalancer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/group/GroupableBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/group/GroupableBalancer.java
new file mode 100644
index 0000000..e696926
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/group/GroupableBalancer.java
@@ -0,0 +1,12 @@
+package org.apache.hadoop.hbase.group;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.master.LoadBalancer;
+
+import java.io.IOException;
+
+@InterfaceAudience.Private
+public interface GroupableBalancer extends LoadBalancer {
+
+  void setGroupInfoManager(GroupInfoManager groupInfoManager) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/16f65bad/hbase-server/src/main/java/org/apache/hadoop/hbase/group/MXBean.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/group/MXBean.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/group/MXBean.java
new file mode 100644
index 0000000..a19b24e
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/group/MXBean.java
@@ -0,0 +1,65 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.group;
+
+import org.apache.hadoop.hbase.HostPort;
+import org.apache.hadoop.hbase.TableName;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+public interface MXBean {
+
+  public Map<String, List<HostPort>> getServersByGroup() throws IOException;
+
+  public List<GroupInfoBean> getGroups() throws IOException;
+
+  public static class GroupInfoBean {
+
+    private String name;
+    private List<HostPort> servers;
+    private List<TableName> tables;
+
+    //Need this to convert NavigableSet to List
+    public GroupInfoBean(GroupInfo groupInfo) {
+      this.name = groupInfo.getName();
+      this.servers = new LinkedList<HostPort>();
+      this.servers.addAll(groupInfo.getServers());
+      this.tables = new LinkedList<TableName>();
+      this.tables.addAll(groupInfo.getTables());
+    }
+
+    public String getName() {
+      return name;
+    }
+
+    public List<HostPort> getServers() {
+      return servers;
+    }
+
+    public List<TableName> getTables() {
+      return tables;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/16f65bad/hbase-server/src/main/java/org/apache/hadoop/hbase/group/MXBeanImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/group/MXBeanImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/group/MXBeanImpl.java
new file mode 100644
index 0000000..b0894eb
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/group/MXBeanImpl.java
@@ -0,0 +1,82 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.group;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HostPort;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.master.MasterServices;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+public class MXBeanImpl implements MXBean {
+  private static final Log LOG = LogFactory.getLog(MXBeanImpl.class);
+
+  private static MXBeanImpl instance = null;
+
+  private GroupAdmin groupAdmin;
+  private MasterServices master;
+
+  public synchronized static MXBeanImpl init(
+      final GroupAdmin groupAdmin,
+      MasterServices master) {
+    if (instance == null) {
+      instance = new MXBeanImpl(groupAdmin, master);
+    }
+    return instance;
+  }
+
+  protected MXBeanImpl(final GroupAdmin groupAdmin,
+      MasterServices master) {
+    this.groupAdmin = groupAdmin;
+    this.master = master;
+  }
+
+  @Override
+  public Map<String, List<HostPort>> getServersByGroup() throws IOException {
+    Map<String, List<HostPort>> data = new HashMap<String, List<HostPort>>();
+    for (final ServerName entry :
+      master.getServerManager().getOnlineServersList()) {
+      GroupInfo groupInfo = groupAdmin.getGroupOfServer(
+          new HostPort(entry.getHostname(), entry.getPort()));
+      if(!data.containsKey(groupInfo.getName())) {
+        data.put(groupInfo.getName(), new LinkedList<HostPort>());
+      }
+      data.get(groupInfo.getName()).add(entry.getHostPort());
+    }
+    return data;
+  }
+
+  @Override
+  public List<GroupInfoBean> getGroups() throws IOException {
+    LinkedList list = new LinkedList();
+    for(GroupInfo group: groupAdmin.listGroups()) {
+      list.add(new GroupInfoBean(group));
+    }
+    return list;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/16f65bad/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
index f7f98fe..248aafc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
@@ -1033,7 +1033,7 @@ public class AssignmentManager {
           return;
         }
         LOG.info("Assigning " + region.getRegionNameAsString() +
-            " to " + plan.getDestination().toString());
+            " to " + plan.getDestination());
         // Transition RegionState to PENDING_OPEN
        regionStates.updateRegionState(region,
           State.PENDING_OPEN, plan.getDestination());
@@ -1222,8 +1222,7 @@ public class AssignmentManager {
           || existingPlan.getDestination() == null
           || !destServers.contains(existingPlan.getDestination())) {
         newPlan = true;
-        randomPlan = new RegionPlan(region, null,
-            balancer.randomAssignment(region, destServers));
+        randomPlan = new RegionPlan(region, null, balancer.randomAssignment(region, destServers));
         if (!region.isMetaTable() && shouldAssignRegionsWithFavoredNodes) {
           List<HRegionInfo> regions = new ArrayList<HRegionInfo>(1);
           regions.add(region);
@@ -1468,6 +1467,14 @@ public class AssignmentManager {
       throw new IOException("Unable to determine a plan to assign region(s)");
     }
 
+    if (bulkPlan.containsKey(LoadBalancer.BOGUS_SERVER_NAME)) {
+      // Found no plan for some regions, put those regions in RIT
+      for (HRegionInfo hri : bulkPlan.get(LoadBalancer.BOGUS_SERVER_NAME)) {
+        regionStates.updateRegionState(hri, State.FAILED_OPEN);
+      }
+      bulkPlan.remove(LoadBalancer.BOGUS_SERVER_NAME);
+    }
+
     assign(regions.size(), servers.size(),
       "retainAssignment=true", bulkPlan);
   }
@@ -1497,6 +1504,14 @@ public class AssignmentManager {
       throw new IOException("Unable to determine a plan to assign region(s)");
     }
 
+    if (bulkPlan.containsKey(LoadBalancer.BOGUS_SERVER_NAME)) {
+      // Found no plan for some regions, put those regions in RIT
+      for (HRegionInfo hri : bulkPlan.get(LoadBalancer.BOGUS_SERVER_NAME)) {
+        regionStates.updateRegionState(hri, State.FAILED_OPEN);
+      }
+      bulkPlan.remove(LoadBalancer.BOGUS_SERVER_NAME);
+    }
+
     processFavoredNodes(regions);
     assign(regions.size(), servers.size(), "round-robin=true", bulkPlan);
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/16f65bad/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index a4bbe6f..503d9b6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -45,6 +45,11 @@ import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Maps;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Service;
+import com.google.common.collect.Lists;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -76,9 +81,13 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.RegionReplicaUtil;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.TableState;
+import org.apache.hadoop.hbase.constraint.ConstraintException;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
 import org.apache.hadoop.hbase.executor.ExecutorType;
+import org.apache.hadoop.hbase.group.GroupAdminServer;
+import org.apache.hadoop.hbase.group.GroupInfo;
+import org.apache.hadoop.hbase.group.GroupableBalancer;
 import org.apache.hadoop.hbase.ipc.RpcServer;
 import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
 import org.apache.hadoop.hbase.master.MasterRpcServices.BalanceSwitchMode;
@@ -323,6 +332,8 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
   // handle table states
   private TableStateManager tableStateManager;
 
+  private GroupAdminServer groupAdminServer;
+
   /** flag used in test cases in order to simulate RS failures during master initialization */
   private volatile boolean initializationBeforeMetaAssignment = false;
 
@@ -718,6 +729,11 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
       waitForServerOnline();
     }
 
+    if (balancer instanceof GroupableBalancer) {
+      groupAdminServer = new GroupAdminServer(this);
+      ((GroupableBalancer)balancer).setGroupInfoManager(groupAdminServer.getGroupInfoManager());
+    }
+
     //initialize load balancer
     this.balancer.setClusterStatus(getClusterStatus());
     this.balancer.setMasterServices(this);
@@ -1373,11 +1389,17 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
       final byte[] destServerName) throws HBaseIOException {
     RegionState regionState = assignmentManager.getRegionStates().
       getRegionState(Bytes.toString(encodedRegionName));
-    if (regionState == null) {
+
+    HRegionInfo hri;
+    if (Bytes.toString(encodedRegionName)
+        .equals(HRegionInfo.FIRST_META_REGIONINFO.getEncodedName())) {
+      hri = HRegionInfo.FIRST_META_REGIONINFO;
+    } else if (regionState != null) {
+      hri = regionState.getRegion();
+    } else {
       throw new UnknownRegionException(Bytes.toStringBinary(encodedRegionName));
     }
 
-    HRegionInfo hri = regionState.getRegion();
     ServerName dest;
     if (destServerName == null || destServerName.length == 0) {
       LOG.info("Passed destination servername is null/empty so " +
@@ -1390,7 +1412,12 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
         return;
       }
     } else {
-      dest = ServerName.valueOf(Bytes.toString(destServerName));
+      ServerName candidate = ServerName.valueOf(Bytes.toString(destServerName));
+      dest = balancer.randomAssignment(hri, Lists.newArrayList(candidate));
+      if (dest == null) {
+        LOG.debug("Unable to determine a plan to assign " + hri);
+        return;
+      }
       if (dest.equals(serverName) && balancer instanceof BaseLoadBalancer
           && !((BaseLoadBalancer)balancer).shouldBeOnMaster(hri)) {
         // To avoid unnecessary region moving later by balancer. Don't put user
@@ -1453,7 +1480,6 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
     HRegionInfo[] newRegions = ModifyRegionUtils.createHRegionInfos(hTableDescriptor, splitKeys);
     checkInitialized();
     sanityCheckTableDescriptor(hTableDescriptor);
-
     if (cpHost != null) {
       cpHost.preCreateTable(hTableDescriptor, newRegions);
     }
@@ -1463,16 +1489,14 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
     //       TableExistsException by saying if the schema is the same or not.
     ProcedurePrepareLatch latch = ProcedurePrepareLatch.createLatch();
     long procId = this.procedureExecutor.submitProcedure(
-      new CreateTableProcedure(
-        procedureExecutor.getEnvironment(), hTableDescriptor, newRegions, latch),
-      nonceGroup,
-      nonce);
+        new CreateTableProcedure(
+            procedureExecutor.getEnvironment(), hTableDescriptor, newRegions, latch),
+        nonceGroup,
+        nonce);
     latch.await();
-
     if (cpHost != null) {
       cpHost.postCreateTable(hTableDescriptor, newRegions);
     }
-
     return procId;
   }
 
@@ -2332,6 +2356,11 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
   public void createNamespace(NamespaceDescriptor descriptor) throws IOException {
     TableName.isLegalNamespaceName(Bytes.toBytes(descriptor.getName()));
     checkNamespaceManagerReady();
+
+    String group =  descriptor.getConfigurationValue(GroupInfo.NAMESPACEDESC_PROP_GROUP);
+    if(group != null && groupAdminServer.getGroupInfo(group) == null) {
+      throw new ConstraintException("Region server group "+group+" does not exit");
+    }
     if (cpHost != null) {
       if (cpHost.preCreateNamespace(descriptor)) {
         return;
@@ -2348,6 +2377,11 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
   public void modifyNamespace(NamespaceDescriptor descriptor) throws IOException {
     TableName.isLegalNamespaceName(Bytes.toBytes(descriptor.getName()));
     checkNamespaceManagerReady();
+
+    String group = descriptor.getConfigurationValue(GroupInfo.NAMESPACEDESC_PROP_GROUP);
+    if(group != null && groupAdminServer.getGroupInfo(group) == null) {
+      throw new ConstraintException("Region server group "+group+" does not exit");
+    }
     if (cpHost != null) {
       if (cpHost.preModifyNamespace(descriptor)) {
         return;
@@ -2667,4 +2701,14 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
     return conf.get(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, LoadBalancerFactory
         .getDefaultLoadBalancerClass().getName());
   }
+
+  @Override
+  public LoadBalancer getLoadBalancer() {
+    return balancer;
+  }
+
+  @Override
+  public GroupAdminServer getGroupAdminServer() {
+    return groupAdminServer;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/16f65bad/hbase-server/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java
index c4eecfa..e942713 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java
@@ -51,6 +51,9 @@ import org.apache.hadoop.hbase.Stoppable;
 @InterfaceAudience.Private
 public interface LoadBalancer extends Configurable, Stoppable, ConfigurationObserver {
 
+  //used to signal to the caller that the region(s) cannot be assigned
+  ServerName BOGUS_SERVER_NAME = ServerName.parseServerName("127.0.0.1,1,1");
+
   /**
    * Set the current cluster status.  This allows a LoadBalancer to map host name to a server
    * @param st

http://git-wip-us.apache.org/repos/asf/hbase/blob/16f65bad/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
index 16b8852..7a3b01f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.master;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.commons.lang.ClassUtils;
 import org.apache.commons.logging.Log;
@@ -30,6 +31,7 @@ import org.apache.hadoop.hbase.Coprocessor;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.HostPort;
 import org.apache.hadoop.hbase.NamespaceDescriptor;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
@@ -1101,4 +1103,116 @@ public class MasterCoprocessorHost
     }
     return bypass;
   }
+
+  public void preMoveServers(final Set<HostPort> servers, final String targetGroup)
+      throws IOException {
+    execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+      @Override
+      public void call(MasterObserver oserver,
+          ObserverContext<MasterCoprocessorEnvironment> ctx) throws IOException {
+        oserver.preMoveServers(ctx, servers, targetGroup);
+      }
+    });
+  }
+
+  public void postMoveServers(final Set<HostPort> servers, final String targetGroup)
+      throws IOException {
+    execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+      @Override
+      public void call(MasterObserver oserver,
+          ObserverContext<MasterCoprocessorEnvironment> ctx) throws IOException {
+        oserver.postMoveServers(ctx, servers, targetGroup);
+      }
+    });
+  }
+
+  public void preMoveTables(final Set<TableName> tables, final String targetGroup)
+      throws IOException {
+    execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+      @Override
+      public void call(MasterObserver oserver,
+          ObserverContext<MasterCoprocessorEnvironment> ctx) throws IOException {
+        oserver.preMoveTables(ctx, tables, targetGroup);
+      }
+    });
+  }
+
+  public void postMoveTables(final Set<TableName> tables, final String targetGroup)
+      throws IOException {
+    execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+      @Override
+      public void call(MasterObserver oserver,
+          ObserverContext<MasterCoprocessorEnvironment> ctx) throws IOException {
+        oserver.postMoveTables(ctx, tables, targetGroup);
+      }
+    });
+  }
+
+  public void preAddGroup(final String name)
+      throws IOException {
+    execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+      @Override
+      public void call(MasterObserver oserver,
+          ObserverContext<MasterCoprocessorEnvironment> ctx) throws IOException {
+        oserver.preAddGroup(ctx, name);
+      }
+    });
+  }
+
+  public void postAddGroup(final String name)
+      throws IOException {
+    execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+      @Override
+      public void call(MasterObserver oserver,
+          ObserverContext<MasterCoprocessorEnvironment> ctx) throws IOException {
+        oserver.postAddGroup(ctx, name);
+      }
+    });
+  }
+
+  public void preRemoveGroup(final String name)
+      throws IOException {
+    execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+      @Override
+      public void call(MasterObserver oserver,
+          ObserverContext<MasterCoprocessorEnvironment> ctx) throws IOException {
+        oserver.preRemoveGroup(ctx, name);
+      }
+    });
+  }
+
+  public void postRemoveGroup(final String name)
+      throws IOException {
+    execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+      @Override
+      public void call(MasterObserver oserver,
+          ObserverContext<MasterCoprocessorEnvironment> ctx) throws IOException {
+        oserver.postRemoveGroup(ctx, name);
+      }
+    });
+  }
+
+  public void preBalanceGroup(final String name)
+      throws IOException {
+    execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+      @Override
+      public void call(MasterObserver oserver,
+          ObserverContext<MasterCoprocessorEnvironment> ctx) throws IOException {
+        oserver.preBalanceGroup(ctx, name);
+      }
+    });
+  }
+
+  public void postBalanceGroup(final String name, final boolean balanceRan)
+      throws IOException {
+    execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+      @Override
+      public void call(MasterObserver oserver,
+          ObserverContext<MasterCoprocessorEnvironment> ctx) throws IOException {
+        oserver.postBalanceGroup(ctx, name, balanceRan);
+      }
+    });
+  }
+
+
 }