You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2017/07/19 00:44:48 UTC
[06/21] hbase git commit: HBASE-15631 Backport Regionserver Groups
(HBASE-6721) to branch-1
http://git-wip-us.apache.org/repos/asf/hbase/blob/a0a7f6f4/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java
----------------------------------------------------------------------
diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java
new file mode 100644
index 0000000..7fcb7c7
--- /dev/null
+++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java
@@ -0,0 +1,758 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.rsgroup;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import com.google.common.collect.Sets;
+import com.google.common.net.HostAndPort;
+import com.google.protobuf.ServiceException;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.Coprocessor;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.NamespaceDescriptor;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableStateManager;
+import org.apache.hadoop.hbase.client.ClusterConnection;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.constraint.ConstraintException;
+import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.ServerListener;
+import org.apache.hadoop.hbase.master.procedure.CreateTableProcedure;
+import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos;
+import org.apache.hadoop.hbase.protobuf.generated.RSGroupProtos;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
+import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
+import org.apache.hadoop.hbase.security.access.AccessControlLists;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ModifyRegionUtils;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * This is an implementation of {@link RSGroupInfoManager}. Which makes
+ * use of an HBase table as the persistence store for the group information.
+ * It also makes use of zookeeper to store group information needed
+ * for bootstrapping during offline mode.
+ */
+public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListener {
+ private static final Log LOG = LogFactory.getLog(RSGroupInfoManagerImpl.class);
+
+ /** Table descriptor for <code>hbase:rsgroup</code> catalog table */
+ private final static HTableDescriptor RSGROUP_TABLE_DESC;
+ static {
+ RSGROUP_TABLE_DESC = new HTableDescriptor(RSGROUP_TABLE_NAME_BYTES);
+ RSGROUP_TABLE_DESC.addFamily(new HColumnDescriptor(META_FAMILY_BYTES));
+ RSGROUP_TABLE_DESC.setRegionSplitPolicyClassName(DisabledRegionSplitPolicy.class.getName());
+ try {
+ RSGROUP_TABLE_DESC.addCoprocessor(
+ MultiRowMutationEndpoint.class.getName(),
+ null, Coprocessor.PRIORITY_SYSTEM, null);
+ } catch (IOException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ private volatile Map<String, RSGroupInfo> rsGroupMap;
+ private volatile Map<TableName, String> tableMap;
+ private MasterServices master;
+ private Table rsGroupTable;
+ private ClusterConnection conn;
+ private ZooKeeperWatcher watcher;
+ private RSGroupStartupWorker rsGroupStartupWorker;
+ // contains list of groups that were last flushed to persistent store
+ private volatile Set<String> prevRSGroups;
+ private RSGroupSerDe rsGroupSerDe;
+ private DefaultServerUpdater defaultServerUpdater;
+
+
+ public RSGroupInfoManagerImpl(MasterServices master) throws IOException {
+ this.rsGroupMap = Collections.emptyMap();
+ this.tableMap = Collections.emptyMap();
+ rsGroupSerDe = new RSGroupSerDe();
+ this.master = master;
+ this.watcher = master.getZooKeeper();
+ this.conn = master.getConnection();
+ rsGroupStartupWorker = new RSGroupStartupWorker(this, master, conn);
+ prevRSGroups = new HashSet<String>();
+ refresh();
+ rsGroupStartupWorker.start();
+ defaultServerUpdater = new DefaultServerUpdater(this);
+ master.getServerManager().registerListener(this);
+ defaultServerUpdater.start();
+ }
+
+ /**
+ * Adds the group.
+ *
+ * @param rsGroupInfo the group name
+ */
+ @Override
+ public synchronized void addRSGroup(RSGroupInfo rsGroupInfo) throws IOException {
+ checkGroupName(rsGroupInfo.getName());
+ if (rsGroupMap.get(rsGroupInfo.getName()) != null ||
+ rsGroupInfo.getName().equals(RSGroupInfo.DEFAULT_GROUP)) {
+ throw new DoNotRetryIOException("Group already exists: "+ rsGroupInfo.getName());
+ }
+ Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
+ newGroupMap.put(rsGroupInfo.getName(), rsGroupInfo);
+ flushConfig(newGroupMap);
+ }
+
+ @Override
+ public synchronized boolean moveServers(Set<HostAndPort> hostPorts, String srcGroup,
+ String dstGroup) throws IOException {
+ if (!rsGroupMap.containsKey(srcGroup)) {
+ throw new DoNotRetryIOException("Group "+srcGroup+" does not exist");
+ }
+ if (!rsGroupMap.containsKey(dstGroup)) {
+ throw new DoNotRetryIOException("Group "+dstGroup+" does not exist");
+ }
+
+ RSGroupInfo src = new RSGroupInfo(getRSGroup(srcGroup));
+ RSGroupInfo dst = new RSGroupInfo(getRSGroup(dstGroup));
+ boolean foundOne = false;
+ for(HostAndPort el: hostPorts) {
+ foundOne = src.removeServer(el) || foundOne;
+ dst.addServer(el);
+ }
+
+ Map<String,RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
+ newGroupMap.put(src.getName(), src);
+ newGroupMap.put(dst.getName(), dst);
+
+ flushConfig(newGroupMap);
+ return foundOne;
+ }
+
+ /**
+ * Gets the group info of server.
+ *
+ * @param hostPort the server
+ * @return An instance of GroupInfo.
+ */
+ @Override
+ public RSGroupInfo getRSGroupOfServer(HostAndPort hostPort) throws IOException {
+ for (RSGroupInfo info : rsGroupMap.values()) {
+ if (info.containsServer(hostPort)){
+ return info;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Gets the group information.
+ *
+ * @param groupName
+ * the group name
+ * @return An instance of GroupInfo
+ */
+ @Override
+ public RSGroupInfo getRSGroup(String groupName) throws IOException {
+ RSGroupInfo RSGroupInfo = rsGroupMap.get(groupName);
+ return RSGroupInfo;
+ }
+
+
+
+ @Override
+ public String getRSGroupOfTable(TableName tableName) throws IOException {
+ return tableMap.get(tableName);
+ }
+
+ @Override
+ public synchronized void moveTables(
+ Set<TableName> tableNames, String groupName) throws IOException {
+ if (groupName != null && !rsGroupMap.containsKey(groupName)) {
+ throw new DoNotRetryIOException("Group "+groupName+" does not exist or is a special group");
+ }
+
+ Map<String,RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
+ for(TableName tableName: tableNames) {
+ if (tableMap.containsKey(tableName)) {
+ RSGroupInfo src = new RSGroupInfo(rsGroupMap.get(tableMap.get(tableName)));
+ src.removeTable(tableName);
+ newGroupMap.put(src.getName(), src);
+ }
+ if(groupName != null) {
+ RSGroupInfo dst = new RSGroupInfo(newGroupMap.get(groupName));
+ dst.addTable(tableName);
+ newGroupMap.put(dst.getName(), dst);
+ }
+ }
+
+ flushConfig(newGroupMap);
+ }
+
+
+ /**
+ * Delete a region server group.
+ *
+ * @param groupName the group name
+ * @throws java.io.IOException Signals that an I/O exception has occurred.
+ */
+ @Override
+ public synchronized void removeRSGroup(String groupName) throws IOException {
+ if (!rsGroupMap.containsKey(groupName) || groupName.equals(RSGroupInfo.DEFAULT_GROUP)) {
+ throw new DoNotRetryIOException("Group "+groupName+" does not exist or is a reserved group");
+ }
+ Map<String,RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
+ newGroupMap.remove(groupName);
+ flushConfig(newGroupMap);
+ }
+
+ @Override
+ public List<RSGroupInfo> listRSGroups() throws IOException {
+ List<RSGroupInfo> list = Lists.newLinkedList(rsGroupMap.values());
+ return list;
+ }
+
+ @Override
+ public boolean isOnline() {
+ return rsGroupStartupWorker.isOnline();
+ }
+
+ @Override
+ public synchronized void refresh() throws IOException {
+ refresh(false);
+ }
+
+ private synchronized void refresh(boolean forceOnline) throws IOException {
+ List<RSGroupInfo> groupList = new LinkedList<RSGroupInfo>();
+
+ // overwrite anything read from zk, group table is source of truth
+ // if online read from GROUP table
+ if (forceOnline || isOnline()) {
+ LOG.debug("Refreshing in Online mode.");
+ if (rsGroupTable == null) {
+ rsGroupTable = conn.getTable(RSGROUP_TABLE_NAME);
+ }
+ groupList.addAll(rsGroupSerDe.retrieveGroupList(rsGroupTable));
+ } else {
+ LOG.debug("Refershing in Offline mode.");
+ String groupBasePath = ZKUtil.joinZNode(watcher.baseZNode, rsGroupZNode);
+ groupList.addAll(rsGroupSerDe.retrieveGroupList(watcher, groupBasePath));
+ }
+
+ // refresh default group, prune
+ NavigableSet<TableName> orphanTables = new TreeSet<TableName>();
+ for(String entry: master.getTableDescriptors().getAll().keySet()) {
+ orphanTables.add(TableName.valueOf(entry));
+ }
+
+ List<TableName> specialTables;
+ if(!master.isInitialized()) {
+ specialTables = new ArrayList<TableName>();
+ specialTables.add(AccessControlLists.ACL_TABLE_NAME);
+ specialTables.add(TableName.META_TABLE_NAME);
+ specialTables.add(TableName.NAMESPACE_TABLE_NAME);
+ specialTables.add(RSGROUP_TABLE_NAME);
+ } else {
+ specialTables =
+ master.listTableNamesByNamespace(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR);
+ }
+
+ for(TableName table : specialTables) {
+ orphanTables.add(table);
+ }
+ for(RSGroupInfo group: groupList) {
+ if(!group.getName().equals(RSGroupInfo.DEFAULT_GROUP)) {
+ orphanTables.removeAll(group.getTables());
+ }
+ }
+
+ // This is added to the last of the list
+ // so it overwrites the default group loaded
+ // from region group table or zk
+ groupList.add(new RSGroupInfo(RSGroupInfo.DEFAULT_GROUP,
+ Sets.newHashSet(getDefaultServers()),
+ orphanTables));
+
+
+ // populate the data
+ HashMap<String, RSGroupInfo> newGroupMap = Maps.newHashMap();
+ HashMap<TableName, String> newTableMap = Maps.newHashMap();
+ for (RSGroupInfo group : groupList) {
+ newGroupMap.put(group.getName(), group);
+ for(TableName table: group.getTables()) {
+ newTableMap.put(table, group.getName());
+ }
+ }
+ rsGroupMap = Collections.unmodifiableMap(newGroupMap);
+ tableMap = Collections.unmodifiableMap(newTableMap);
+
+ prevRSGroups.clear();
+ prevRSGroups.addAll(rsGroupMap.keySet());
+ }
+
+ private synchronized Map<TableName,String> flushConfigTable(Map<String,RSGroupInfo> newGroupMap)
+ throws IOException {
+ Map<TableName,String> newTableMap = Maps.newHashMap();
+ List<Mutation> mutations = Lists.newArrayList();
+
+ // populate deletes
+ for(String groupName : prevRSGroups) {
+ if(!newGroupMap.containsKey(groupName)) {
+ Delete d = new Delete(Bytes.toBytes(groupName));
+ mutations.add(d);
+ }
+ }
+
+ // populate puts
+ for(RSGroupInfo RSGroupInfo : newGroupMap.values()) {
+ RSGroupProtos.RSGroupInfo proto = ProtobufUtil.toProtoGroupInfo(RSGroupInfo);
+ Put p = new Put(Bytes.toBytes(RSGroupInfo.getName()));
+ p.addColumn(META_FAMILY_BYTES,
+ META_QUALIFIER_BYTES,
+ proto.toByteArray());
+ mutations.add(p);
+ for(TableName entry: RSGroupInfo.getTables()) {
+ newTableMap.put(entry, RSGroupInfo.getName());
+ }
+ }
+
+ if(mutations.size() > 0) {
+ multiMutate(mutations);
+ }
+ return newTableMap;
+ }
+
+ private synchronized void flushConfig(Map<String, RSGroupInfo> newGroupMap) throws IOException {
+ Map<TableName, String> newTableMap;
+
+ // For offline mode persistence is still unavailable
+ // We're refreshing in-memory state but only for default servers
+ if (!isOnline()) {
+ Map<String, RSGroupInfo> m = Maps.newHashMap(rsGroupMap);
+ RSGroupInfo oldDefaultGroup = m.remove(RSGroupInfo.DEFAULT_GROUP);
+ RSGroupInfo newDefaultGroup = newGroupMap.remove(RSGroupInfo.DEFAULT_GROUP);
+ if (!m.equals(newGroupMap) ||
+ !oldDefaultGroup.getTables().equals(newDefaultGroup.getTables())) {
+ throw new IOException("Only default servers can be updated during offline mode");
+ }
+ newGroupMap.put(RSGroupInfo.DEFAULT_GROUP, newDefaultGroup);
+ rsGroupMap = newGroupMap;
+ return;
+ }
+
+ newTableMap = flushConfigTable(newGroupMap);
+
+ // make changes visible since it has been
+ // persisted in the source of truth
+ rsGroupMap = Collections.unmodifiableMap(newGroupMap);
+ tableMap = Collections.unmodifiableMap(newTableMap);
+
+
+ try {
+ String groupBasePath = ZKUtil.joinZNode(watcher.baseZNode, rsGroupZNode);
+ ZKUtil.createAndFailSilent(watcher, groupBasePath, ProtobufUtil.PB_MAGIC);
+
+ List<ZKUtil.ZKUtilOp> zkOps = new ArrayList<ZKUtil.ZKUtilOp>(newGroupMap.size());
+ for(String groupName : prevRSGroups) {
+ if(!newGroupMap.containsKey(groupName)) {
+ String znode = ZKUtil.joinZNode(groupBasePath, groupName);
+ zkOps.add(ZKUtil.ZKUtilOp.deleteNodeFailSilent(znode));
+ }
+ }
+
+
+ for(RSGroupInfo RSGroupInfo : newGroupMap.values()) {
+ String znode = ZKUtil.joinZNode(groupBasePath, RSGroupInfo.getName());
+ RSGroupProtos.RSGroupInfo proto = ProtobufUtil.toProtoGroupInfo(RSGroupInfo);
+ LOG.debug("Updating znode: "+znode);
+ ZKUtil.createAndFailSilent(watcher, znode);
+ zkOps.add(ZKUtil.ZKUtilOp.deleteNodeFailSilent(znode));
+ zkOps.add(ZKUtil.ZKUtilOp.createAndFailSilent(znode,
+ ProtobufUtil.prependPBMagic(proto.toByteArray())));
+ }
+ LOG.debug("Writing ZK GroupInfo count: " + zkOps.size());
+
+ ZKUtil.multiOrSequential(watcher, zkOps, false);
+ } catch (KeeperException e) {
+ LOG.error("Failed to write to rsGroupZNode", e);
+ master.abort("Failed to write to rsGroupZNode", e);
+ throw new IOException("Failed to write to rsGroupZNode",e);
+ }
+
+ prevRSGroups.clear();
+ prevRSGroups.addAll(newGroupMap.keySet());
+ }
+
+ private List<ServerName> getOnlineRS() throws IOException {
+ if (master != null) {
+ return master.getServerManager().getOnlineServersList();
+ }
+ try {
+ LOG.debug("Reading online RS from zookeeper");
+ List<ServerName> servers = new LinkedList<ServerName>();
+ for (String el: ZKUtil.listChildrenNoWatch(watcher, watcher.rsZNode)) {
+ servers.add(ServerName.parseServerName(el));
+ }
+ return servers;
+ } catch (KeeperException e) {
+ throw new IOException("Failed to retrieve server list from zookeeper", e);
+ }
+ }
+
+ private List<HostAndPort> getDefaultServers() throws IOException {
+ List<HostAndPort> defaultServers = new LinkedList<HostAndPort>();
+ for(ServerName server : getOnlineRS()) {
+ HostAndPort hostPort = HostAndPort.fromParts(server.getHostname(), server.getPort());
+ boolean found = false;
+ for(RSGroupInfo RSGroupInfo : rsGroupMap.values()) {
+ if(!RSGroupInfo.DEFAULT_GROUP.equals(RSGroupInfo.getName()) &&
+ RSGroupInfo.containsServer(hostPort)) {
+ found = true;
+ break;
+ }
+ }
+ if(!found) {
+ defaultServers.add(hostPort);
+ }
+ }
+ return defaultServers;
+ }
+
+ private synchronized void updateDefaultServers(
+ Set<HostAndPort> hostPort) throws IOException {
+ RSGroupInfo info = rsGroupMap.get(RSGroupInfo.DEFAULT_GROUP);
+ RSGroupInfo newInfo = new RSGroupInfo(info.getName(), hostPort, info.getTables());
+ HashMap<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
+ newGroupMap.put(newInfo.getName(), newInfo);
+ flushConfig(newGroupMap);
+ }
+
+ @Override
+ public void serverAdded(ServerName serverName) {
+ defaultServerUpdater.serverChanged();
+ }
+
+ @Override
+ public void serverRemoved(ServerName serverName) {
+ defaultServerUpdater.serverChanged();
+ }
+
+ private static class DefaultServerUpdater extends Thread {
+ private static final Log LOG = LogFactory.getLog(DefaultServerUpdater.class);
+ private RSGroupInfoManagerImpl mgr;
+ private boolean hasChanged = false;
+
+ public DefaultServerUpdater(RSGroupInfoManagerImpl mgr) {
+ this.mgr = mgr;
+ }
+
+ @Override
+ public void run() {
+ List<HostAndPort> prevDefaultServers = new LinkedList<HostAndPort>();
+ while(!mgr.master.isAborted() || !mgr.master.isStopped()) {
+ try {
+ LOG.info("Updating default servers.");
+ List<HostAndPort> servers = mgr.getDefaultServers();
+ Collections.sort(servers, new Comparator<HostAndPort>() {
+ @Override
+ public int compare(HostAndPort o1, HostAndPort o2) {
+ int diff = o1.getHostText().compareTo(o2.getHostText());
+ if (diff != 0) {
+ return diff;
+ }
+ return o1.getPort() - o2.getPort();
+ }
+ });
+ if(!servers.equals(prevDefaultServers)) {
+ mgr.updateDefaultServers(Sets.<HostAndPort>newHashSet(servers));
+ prevDefaultServers = servers;
+ LOG.info("Updated with servers: "+servers.size());
+ }
+ try {
+ synchronized (this) {
+ if(!hasChanged) {
+ wait();
+ }
+ hasChanged = false;
+ }
+ } catch (InterruptedException e) {
+ }
+ } catch (IOException e) {
+ LOG.warn("Failed to update default servers", e);
+ }
+ }
+ }
+
+ public void serverChanged() {
+ synchronized (this) {
+ hasChanged = true;
+ this.notify();
+ }
+ }
+ }
+
+ @Override
+ public void waiting() {
+
+ }
+
+ private static class RSGroupStartupWorker extends Thread {
+ private static final Log LOG = LogFactory.getLog(RSGroupStartupWorker.class);
+
+ private Configuration conf;
+ private volatile boolean isOnline = false;
+ private MasterServices masterServices;
+ private RSGroupInfoManagerImpl groupInfoManager;
+ private ClusterConnection conn;
+
+ public RSGroupStartupWorker(RSGroupInfoManagerImpl groupInfoManager,
+ MasterServices masterServices,
+ ClusterConnection conn) {
+ this.conf = masterServices.getConfiguration();
+ this.masterServices = masterServices;
+ this.groupInfoManager = groupInfoManager;
+ this.conn = conn;
+ setName(RSGroupStartupWorker.class.getName()+"-"+masterServices.getServerName());
+ setDaemon(true);
+ }
+
+ @Override
+ public void run() {
+ if(waitForGroupTableOnline()) {
+ LOG.info("GroupBasedLoadBalancer is now online");
+ }
+ }
+
+ public boolean waitForGroupTableOnline() {
+ final List<HRegionInfo> foundRegions = new LinkedList<HRegionInfo>();
+ final List<HRegionInfo> assignedRegions = new LinkedList<HRegionInfo>();
+ final AtomicBoolean found = new AtomicBoolean(false);
+ final TableStateManager tsm =
+ masterServices.getAssignmentManager().getTableStateManager();
+ boolean createSent = false;
+ while (!found.get() && isMasterRunning()) {
+ foundRegions.clear();
+ assignedRegions.clear();
+ found.set(true);
+ try {
+ final Table nsTable = conn.getTable(TableName.NAMESPACE_TABLE_NAME);
+ final Table groupTable = conn.getTable(RSGROUP_TABLE_NAME);
+ boolean rootMetaFound =
+ masterServices.getMetaTableLocator().verifyMetaRegionLocation(
+ conn,
+ masterServices.getZooKeeper(),
+ 1);
+ final AtomicBoolean nsFound = new AtomicBoolean(false);
+ if (rootMetaFound) {
+
+ MetaTableAccessor.Visitor visitor = new MetaTableAccessor.Visitor() {
+ @Override
+ public boolean visit(Result row) throws IOException {
+
+ HRegionInfo info = MetaTableAccessor.getHRegionInfo(row);
+ if (info != null) {
+ Cell serverCell =
+ row.getColumnLatestCell(HConstants.CATALOG_FAMILY,
+ HConstants.SERVER_QUALIFIER);
+ if (RSGROUP_TABLE_NAME.equals(info.getTable()) && serverCell != null) {
+ ServerName sn =
+ ServerName.parseVersionedServerName(CellUtil.cloneValue(serverCell));
+ if (sn == null) {
+ found.set(false);
+ } else if (tsm.isTableState(RSGROUP_TABLE_NAME,
+ ZooKeeperProtos.Table.State.ENABLED)) {
+ try {
+ ClientProtos.ClientService.BlockingInterface rs = conn.getClient(sn);
+ ClientProtos.GetRequest request =
+ RequestConverter.buildGetRequest(info.getRegionName(),
+ new Get(ROW_KEY));
+ rs.get(null, request);
+ assignedRegions.add(info);
+ } catch(Exception ex) {
+ LOG.debug("Caught exception while verifying group region", ex);
+ }
+ }
+ foundRegions.add(info);
+ }
+ if (TableName.NAMESPACE_TABLE_NAME.equals(info.getTable())) {
+ Cell cell = row.getColumnLatestCell(HConstants.CATALOG_FAMILY,
+ HConstants.SERVER_QUALIFIER);
+ ServerName sn = null;
+ if(cell != null) {
+ sn = ServerName.parseVersionedServerName(CellUtil.cloneValue(cell));
+ }
+ if (tsm.isTableState(TableName.NAMESPACE_TABLE_NAME,
+ ZooKeeperProtos.Table.State.ENABLED)) {
+ try {
+ ClientProtos.ClientService.BlockingInterface rs = conn.getClient(sn);
+ ClientProtos.GetRequest request =
+ RequestConverter.buildGetRequest(info.getRegionName(),
+ new Get(ROW_KEY));
+ rs.get(null, request);
+ nsFound.set(true);
+ } catch(Exception ex) {
+ LOG.debug("Caught exception while verifying group region", ex);
+ }
+ }
+ }
+ }
+ return true;
+ }
+ };
+ MetaTableAccessor.fullScan(conn, visitor);
+ // if no regions in meta then we have to create the table
+ if (foundRegions.size() < 1 && rootMetaFound && !createSent && nsFound.get()) {
+ groupInfoManager.createGroupTable(masterServices);
+ createSent = true;
+ }
+ LOG.info("Group table: " + RSGROUP_TABLE_NAME + " isOnline: " + found.get()
+ + ", regionCount: " + foundRegions.size() + ", assignCount: "
+ + assignedRegions.size() + ", rootMetaFound: "+rootMetaFound);
+ found.set(found.get() && assignedRegions.size() == foundRegions.size()
+ && foundRegions.size() > 0);
+ } else {
+ LOG.info("Waiting for catalog tables to come online");
+ found.set(false);
+ }
+ if (found.get()) {
+ LOG.debug("With group table online, refreshing cached information.");
+ groupInfoManager.refresh(true);
+ isOnline = true;
+ //flush any inconsistencies between ZK and HTable
+ groupInfoManager.flushConfig(groupInfoManager.rsGroupMap);
+ }
+ } catch(Exception e) {
+ found.set(false);
+ LOG.warn("Failed to perform check", e);
+ }
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ LOG.info("Sleep interrupted", e);
+ }
+ }
+ return found.get();
+ }
+
+ public boolean isOnline() {
+ return isOnline;
+ }
+
+ private boolean isMasterRunning() {
+ return !masterServices.isAborted() && !masterServices.isStopped();
+ }
+ }
+
+ private void createGroupTable(MasterServices masterServices) throws IOException {
+ HRegionInfo[] newRegions =
+ ModifyRegionUtils.createHRegionInfos(RSGROUP_TABLE_DESC, null);
+ ProcedurePrepareLatch latch = ProcedurePrepareLatch.createLatch();
+ masterServices.getMasterProcedureExecutor().submitProcedure(
+ new CreateTableProcedure(
+ masterServices.getMasterProcedureExecutor().getEnvironment(),
+ RSGROUP_TABLE_DESC,
+ newRegions,
+ latch));
+ latch.await();
+ // wait for region to be online
+ int tries = 600;
+ while(masterServices.getAssignmentManager().getRegionStates()
+ .getRegionServerOfRegion(newRegions[0]) == null && tries > 0) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ throw new IOException("Wait interrupted", e);
+ }
+ tries--;
+ }
+ if(tries <= 0) {
+ throw new IOException("Failed to create group table.");
+ }
+ }
+
+ private void multiMutate(List<Mutation> mutations)
+ throws IOException {
+ CoprocessorRpcChannel channel = rsGroupTable.coprocessorService(ROW_KEY);
+ MultiRowMutationProtos.MutateRowsRequest.Builder mmrBuilder
+ = MultiRowMutationProtos.MutateRowsRequest.newBuilder();
+ for (Mutation mutation : mutations) {
+ if (mutation instanceof Put) {
+ mmrBuilder.addMutationRequest(ProtobufUtil.toMutation(
+ ClientProtos.MutationProto.MutationType.PUT, mutation));
+ } else if (mutation instanceof Delete) {
+ mmrBuilder.addMutationRequest(ProtobufUtil.toMutation(
+ ClientProtos.MutationProto.MutationType.DELETE, mutation));
+ } else {
+ throw new DoNotRetryIOException("multiMutate doesn't support "
+ + mutation.getClass().getName());
+ }
+ }
+
+ MultiRowMutationProtos.MultiRowMutationService.BlockingInterface service =
+ MultiRowMutationProtos.MultiRowMutationService.newBlockingStub(channel);
+ try {
+ service.mutateRows(null, mmrBuilder.build());
+ } catch (ServiceException ex) {
+ ProtobufUtil.toIOException(ex);
+ }
+ }
+
+ private void checkGroupName(String groupName) throws ConstraintException {
+ if(!groupName.matches("[a-zA-Z0-9_]+")) {
+ throw new ConstraintException("Group name should only contain alphanumeric characters");
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/a0a7f6f4/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupSerDe.java
----------------------------------------------------------------------
diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupSerDe.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupSerDe.java
new file mode 100644
index 0000000..530db58
--- /dev/null
+++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupSerDe.java
@@ -0,0 +1,88 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.rsgroup;
+
+import com.google.common.collect.Lists;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.RSGroupProtos;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+
+//TODO do better encapsulation of SerDe logic from GroupInfoManager and GroupTracker
+public class RSGroupSerDe {
+ private static final Log LOG = LogFactory.getLog(RSGroupSerDe.class);
+
+ public RSGroupSerDe() {
+
+ }
+
+ public List<RSGroupInfo> retrieveGroupList(Table groupTable) throws IOException {
+ List<RSGroupInfo> RSGroupInfoList = Lists.newArrayList();
+ for (Result result : groupTable.getScanner(new Scan())) {
+ RSGroupProtos.RSGroupInfo proto =
+ RSGroupProtos.RSGroupInfo.parseFrom(
+ result.getValue(
+ RSGroupInfoManager.META_FAMILY_BYTES,
+ RSGroupInfoManager.META_QUALIFIER_BYTES));
+ RSGroupInfoList.add(ProtobufUtil.toGroupInfo(proto));
+ }
+ return RSGroupInfoList;
+ }
+
+ public List<RSGroupInfo> retrieveGroupList(ZooKeeperWatcher watcher,
+ String groupBasePath) throws IOException {
+ List<RSGroupInfo> RSGroupInfoList = Lists.newArrayList();
+ //Overwrite any info stored by table, this takes precedence
+ try {
+ if(ZKUtil.checkExists(watcher, groupBasePath) != -1) {
+ for(String znode: ZKUtil.listChildrenAndWatchForNewChildren(watcher, groupBasePath)) {
+ byte[] data = ZKUtil.getData(watcher, ZKUtil.joinZNode(groupBasePath, znode));
+ if(data.length > 0) {
+ ProtobufUtil.expectPBMagicPrefix(data);
+ ByteArrayInputStream bis = new ByteArrayInputStream(
+ data, ProtobufUtil.lengthOfPBMagic(), data.length);
+ RSGroupInfoList.add(ProtobufUtil.toGroupInfo(RSGroupProtos.RSGroupInfo.parseFrom(bis)));
+ }
+ }
+ LOG.debug("Read ZK GroupInfo count:" + RSGroupInfoList.size());
+ }
+ } catch (KeeperException e) {
+ throw new IOException("Failed to read rsGroupZNode",e);
+ } catch (DeserializationException e) {
+ throw new IOException("Failed to read rsGroupZNode",e);
+ } catch (InterruptedException e) {
+ throw new IOException("Failed to read rsGroupZNode",e);
+ }
+ return RSGroupInfoList;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/a0a7f6f4/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupableBalancer.java
----------------------------------------------------------------------
diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupableBalancer.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupableBalancer.java
new file mode 100644
index 0000000..ec86dda
--- /dev/null
+++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupableBalancer.java
@@ -0,0 +1,29 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.rsgroup;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.master.LoadBalancer;
+
+
+@InterfaceAudience.Private
+public interface RSGroupableBalancer extends LoadBalancer {
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/a0a7f6f4/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancer.java
----------------------------------------------------------------------
diff --git a/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancer.java b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancer.java
new file mode 100644
index 0000000..1539f73
--- /dev/null
+++ b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancer.java
@@ -0,0 +1,574 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.balancer;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.net.HostAndPort;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableDescriptors;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.rsgroup.RSGroupBasedLoadBalancer;
+import org.apache.hadoop.hbase.rsgroup.RSGroupInfo;
+import org.apache.hadoop.hbase.rsgroup.RSGroupInfoManager;
+import org.apache.hadoop.hbase.master.AssignmentManager;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.RegionPlan;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.security.SecureRandom;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+//TODO use stochastic based load balancer instead
+@Category(SmallTests.class)
+public class TestRSGroupBasedLoadBalancer {
+
+ private static final Log LOG = LogFactory.getLog(TestRSGroupBasedLoadBalancer.class);
+ private static RSGroupBasedLoadBalancer loadBalancer;
+ private static SecureRandom rand;
+
+ static String[] groups = new String[] { RSGroupInfo.DEFAULT_GROUP, "dg2", "dg3",
+ "dg4" };
+ static TableName[] tables =
+ new TableName[] { TableName.valueOf("dt1"),
+ TableName.valueOf("dt2"),
+ TableName.valueOf("dt3"),
+ TableName.valueOf("dt4")};
+ static List<ServerName> servers;
+ static Map<String, RSGroupInfo> groupMap;
+ static Map<TableName, String> tableMap;
+ static List<HTableDescriptor> tableDescs;
+ int[] regionAssignment = new int[] { 2, 5, 7, 10, 4, 3, 1 };
+ static int regionId = 0;
+
+ @BeforeClass
+ public static void beforeAllTests() throws Exception {
+ rand = new SecureRandom();
+ servers = generateServers(7);
+ groupMap = constructGroupInfo(servers, groups);
+ tableMap = new HashMap<TableName, String>();
+ tableDescs = constructTableDesc();
+ Configuration conf = HBaseConfiguration.create();
+ conf.set("hbase.regions.slop", "0");
+ conf.set("hbase.group.grouploadbalancer.class", SimpleLoadBalancer.class.getCanonicalName());
+ loadBalancer = new RSGroupBasedLoadBalancer(getMockedGroupInfoManager());
+ loadBalancer.setMasterServices(getMockedMaster());
+ loadBalancer.setConf(conf);
+ loadBalancer.initialize();
+ }
+
+ /**
+ * Test the load balancing algorithm.
+ *
+ * Invariant is that all servers of the group should be hosting either floor(average) or
+ * ceiling(average)
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testBalanceCluster() throws Exception {
+ Map<ServerName, List<HRegionInfo>> servers = mockClusterServers();
+ ArrayListMultimap<String, ServerAndLoad> list = convertToGroupBasedMap(servers);
+ LOG.info("Mock Cluster : " + printStats(list));
+ List<RegionPlan> plans = loadBalancer.balanceCluster(servers);
+ ArrayListMultimap<String, ServerAndLoad> balancedCluster = reconcile(
+ list, plans);
+ LOG.info("Mock Balance : " + printStats(balancedCluster));
+ assertClusterAsBalanced(balancedCluster);
+ }
+
+ /**
+ * Invariant is that all servers of a group have load between floor(avg) and
+ * ceiling(avg) number of regions.
+ */
+ private void assertClusterAsBalanced(
+ ArrayListMultimap<String, ServerAndLoad> groupLoadMap) {
+ for (String gName : groupLoadMap.keySet()) {
+ List<ServerAndLoad> groupLoad = groupLoadMap.get(gName);
+ int numServers = groupLoad.size();
+ int numRegions = 0;
+ int maxRegions = 0;
+ int minRegions = Integer.MAX_VALUE;
+ for (ServerAndLoad server : groupLoad) {
+ int nr = server.getLoad();
+ if (nr > maxRegions) {
+ maxRegions = nr;
+ }
+ if (nr < minRegions) {
+ minRegions = nr;
+ }
+ numRegions += nr;
+ }
+ if (maxRegions - minRegions < 2) {
+ // less than 2 between max and min, can't balance
+ return;
+ }
+ int min = numRegions / numServers;
+ int max = numRegions % numServers == 0 ? min : min + 1;
+
+ for (ServerAndLoad server : groupLoad) {
+ assertTrue(server.getLoad() <= max);
+ assertTrue(server.getLoad() >= min);
+ }
+ }
+ }
+
+ /**
+ * All regions have an assignment.
+ *
+ * @param regions
+ * @param servers
+ * @param assignments
+ * @throws java.io.IOException
+ * @throws java.io.FileNotFoundException
+ */
+ private void assertImmediateAssignment(List<HRegionInfo> regions,
+ List<ServerName> servers,
+ Map<HRegionInfo, ServerName> assignments)
+ throws IOException {
+ for (HRegionInfo region : regions) {
+ assertTrue(assignments.containsKey(region));
+ ServerName server = assignments.get(region);
+ TableName tableName = region.getTable();
+
+ String groupName =
+ getMockedGroupInfoManager().getRSGroupOfTable(tableName);
+ assertTrue(StringUtils.isNotEmpty(groupName));
+ RSGroupInfo gInfo = getMockedGroupInfoManager().getRSGroup(groupName);
+ assertTrue("Region is not correctly assigned to group servers.",
+ gInfo.containsServer(server.getHostPort()));
+ }
+ }
+
+ /**
+ * Tests the bulk assignment used during cluster startup.
+ *
+ * Round-robin. Should yield a balanced cluster so same invariant as the
+ * load balancer holds, all servers holding either floor(avg) or
+ * ceiling(avg).
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testBulkAssignment() throws Exception {
+ List<HRegionInfo> regions = randomRegions(25);
+ Map<ServerName, List<HRegionInfo>> assignments = loadBalancer
+ .roundRobinAssignment(regions, servers);
+ //test empty region/servers scenario
+ //this should not throw an NPE
+ loadBalancer.roundRobinAssignment(regions,
+ Collections.EMPTY_LIST);
+ //test regular scenario
+ assertTrue(assignments.keySet().size() == servers.size());
+ for (ServerName sn : assignments.keySet()) {
+ List<HRegionInfo> regionAssigned = assignments.get(sn);
+ for (HRegionInfo region : regionAssigned) {
+ TableName tableName = region.getTable();
+ String groupName =
+ getMockedGroupInfoManager().getRSGroupOfTable(tableName);
+ assertTrue(StringUtils.isNotEmpty(groupName));
+ RSGroupInfo gInfo = getMockedGroupInfoManager().getRSGroup(
+ groupName);
+ assertTrue(
+ "Region is not correctly assigned to group servers.",
+ gInfo.containsServer(sn.getHostPort()));
+ }
+ }
+ ArrayListMultimap<String, ServerAndLoad> loadMap = convertToGroupBasedMap(assignments);
+ assertClusterAsBalanced(loadMap);
+ }
+
+ /**
+ * Test the cluster startup bulk assignment which attempts to retain
+ * assignment info.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testRetainAssignment() throws Exception {
+ // Test simple case where all same servers are there
+ Map<ServerName, List<HRegionInfo>> currentAssignments = mockClusterServers();
+ Map<HRegionInfo, ServerName> inputForTest = new HashMap<HRegionInfo, ServerName>();
+ for (ServerName sn : currentAssignments.keySet()) {
+ for (HRegionInfo region : currentAssignments.get(sn)) {
+ inputForTest.put(region, sn);
+ }
+ }
+ //verify region->null server assignment is handled
+ inputForTest.put(randomRegions(1).get(0), null);
+ Map<ServerName, List<HRegionInfo>> newAssignment = loadBalancer
+ .retainAssignment(inputForTest, servers);
+ assertRetainedAssignment(inputForTest, servers, newAssignment);
+ }
+
+ /**
+ * Asserts a valid retained assignment plan.
+ * <p>
+ * Must meet the following conditions:
+ * <ul>
+ * <li>Every input region has an assignment, and to an online server
+ * <li>If a region had an existing assignment to a server with the same
+ * address a a currently online server, it will be assigned to it
+ * </ul>
+ *
+ * @param existing
+ * @param assignment
+ * @throws java.io.IOException
+ * @throws java.io.FileNotFoundException
+ */
+ private void assertRetainedAssignment(
+ Map<HRegionInfo, ServerName> existing, List<ServerName> servers,
+ Map<ServerName, List<HRegionInfo>> assignment)
+ throws FileNotFoundException, IOException {
+ // Verify condition 1, every region assigned, and to online server
+ Set<ServerName> onlineServerSet = new TreeSet<ServerName>(servers);
+ Set<HRegionInfo> assignedRegions = new TreeSet<HRegionInfo>();
+ for (Map.Entry<ServerName, List<HRegionInfo>> a : assignment.entrySet()) {
+ assertTrue(
+ "Region assigned to server that was not listed as online",
+ onlineServerSet.contains(a.getKey()));
+ for (HRegionInfo r : a.getValue())
+ assignedRegions.add(r);
+ }
+ assertEquals(existing.size(), assignedRegions.size());
+
+ // Verify condition 2, every region must be assigned to correct server.
+ Set<String> onlineHostNames = new TreeSet<String>();
+ for (ServerName s : servers) {
+ onlineHostNames.add(s.getHostname());
+ }
+
+ for (Map.Entry<ServerName, List<HRegionInfo>> a : assignment.entrySet()) {
+ ServerName currentServer = a.getKey();
+ for (HRegionInfo r : a.getValue()) {
+ ServerName oldAssignedServer = existing.get(r);
+ TableName tableName = r.getTable();
+ String groupName =
+ getMockedGroupInfoManager().getRSGroupOfTable(tableName);
+ assertTrue(StringUtils.isNotEmpty(groupName));
+ RSGroupInfo gInfo = getMockedGroupInfoManager().getRSGroup(
+ groupName);
+ assertTrue(
+ "Region is not correctly assigned to group servers.",
+ gInfo.containsServer(currentServer.getHostPort()));
+ if (oldAssignedServer != null
+ && onlineHostNames.contains(oldAssignedServer
+ .getHostname())) {
+ // this region was previously assigned somewhere, and that
+ // host is still around, then the host must have been is a
+ // different group.
+ if (!oldAssignedServer.getHostPort().equals(currentServer.getHostPort())) {
+ assertFalse(gInfo.containsServer(oldAssignedServer.getHostPort()));
+ }
+ }
+ }
+ }
+ }
+
+ private String printStats(
+ ArrayListMultimap<String, ServerAndLoad> groupBasedLoad) {
+ StringBuffer sb = new StringBuffer();
+ sb.append("\n");
+ for (String groupName : groupBasedLoad.keySet()) {
+ sb.append("Stats for group: " + groupName);
+ sb.append("\n");
+ sb.append(groupMap.get(groupName).getServers());
+ sb.append("\n");
+ List<ServerAndLoad> groupLoad = groupBasedLoad.get(groupName);
+ int numServers = groupLoad.size();
+ int totalRegions = 0;
+ sb.append("Per Server Load: \n");
+ for (ServerAndLoad sLoad : groupLoad) {
+ sb.append("Server :" + sLoad.getServerName() + " Load : "
+ + sLoad.getLoad() + "\n");
+ totalRegions += sLoad.getLoad();
+ }
+ sb.append(" Group Statistics : \n");
+ float average = (float) totalRegions / numServers;
+ int max = (int) Math.ceil(average);
+ int min = (int) Math.floor(average);
+ sb.append("[srvr=" + numServers + " rgns=" + totalRegions + " avg="
+ + average + " max=" + max + " min=" + min + "]");
+ sb.append("\n");
+ sb.append("===============================");
+ sb.append("\n");
+ }
+ return sb.toString();
+ }
+
+ private ArrayListMultimap<String, ServerAndLoad> convertToGroupBasedMap(
+ final Map<ServerName, List<HRegionInfo>> serversMap) throws IOException {
+ ArrayListMultimap<String, ServerAndLoad> loadMap = ArrayListMultimap
+ .create();
+ for (RSGroupInfo gInfo : getMockedGroupInfoManager().listRSGroups()) {
+ Set<HostAndPort> groupServers = gInfo.getServers();
+ for (HostAndPort hostPort : groupServers) {
+ ServerName actual = null;
+ for(ServerName entry: servers) {
+ if(entry.getHostPort().equals(hostPort)) {
+ actual = entry;
+ break;
+ }
+ }
+ List<HRegionInfo> regions = serversMap.get(actual);
+ assertTrue("No load for " + actual, regions != null);
+ loadMap.put(gInfo.getName(),
+ new ServerAndLoad(actual, regions.size()));
+ }
+ }
+ return loadMap;
+ }
+
+ private ArrayListMultimap<String, ServerAndLoad> reconcile(
+ ArrayListMultimap<String, ServerAndLoad> previousLoad,
+ List<RegionPlan> plans) {
+ ArrayListMultimap<String, ServerAndLoad> result = ArrayListMultimap
+ .create();
+ result.putAll(previousLoad);
+ if (plans != null) {
+ for (RegionPlan plan : plans) {
+ ServerName source = plan.getSource();
+ updateLoad(result, source, -1);
+ ServerName destination = plan.getDestination();
+ updateLoad(result, destination, +1);
+ }
+ }
+ return result;
+ }
+
+ private void updateLoad(
+ ArrayListMultimap<String, ServerAndLoad> previousLoad,
+ final ServerName sn, final int diff) {
+ for (String groupName : previousLoad.keySet()) {
+ ServerAndLoad newSAL = null;
+ ServerAndLoad oldSAL = null;
+ for (ServerAndLoad sal : previousLoad.get(groupName)) {
+ if (ServerName.isSameHostnameAndPort(sn, sal.getServerName())) {
+ oldSAL = sal;
+ newSAL = new ServerAndLoad(sn, sal.getLoad() + diff);
+ break;
+ }
+ }
+ if (newSAL != null) {
+ previousLoad.remove(groupName, oldSAL);
+ previousLoad.put(groupName, newSAL);
+ break;
+ }
+ }
+ }
+
+ private Map<ServerName, List<HRegionInfo>> mockClusterServers() throws IOException {
+ assertTrue(servers.size() == regionAssignment.length);
+ Map<ServerName, List<HRegionInfo>> assignment = new TreeMap<ServerName, List<HRegionInfo>>();
+ for (int i = 0; i < servers.size(); i++) {
+ int numRegions = regionAssignment[i];
+ List<HRegionInfo> regions = assignedRegions(numRegions, servers.get(i));
+ assignment.put(servers.get(i), regions);
+ }
+ return assignment;
+ }
+
+ /**
+ * Generate a list of regions evenly distributed between the tables.
+ *
+ * @param numRegions The number of regions to be generated.
+ * @return List of HRegionInfo.
+ */
+ private List<HRegionInfo> randomRegions(int numRegions) {
+ List<HRegionInfo> regions = new ArrayList<HRegionInfo>(numRegions);
+ byte[] start = new byte[16];
+ byte[] end = new byte[16];
+ rand.nextBytes(start);
+ rand.nextBytes(end);
+ int regionIdx = rand.nextInt(tables.length);
+ for (int i = 0; i < numRegions; i++) {
+ Bytes.putInt(start, 0, numRegions << 1);
+ Bytes.putInt(end, 0, (numRegions << 1) + 1);
+ int tableIndex = (i + regionIdx) % tables.length;
+ HRegionInfo hri = new HRegionInfo(
+ tables[tableIndex], start, end, false, regionId++);
+ regions.add(hri);
+ }
+ return regions;
+ }
+
+ /**
+ * Generate assigned regions to a given server using group information.
+ *
+ * @param numRegions the num regions to generate
+ * @param sn the servername
+ * @return the list of regions
+ * @throws java.io.IOException Signals that an I/O exception has occurred.
+ */
+ private List<HRegionInfo> assignedRegions(int numRegions, ServerName sn) throws IOException {
+ List<HRegionInfo> regions = new ArrayList<HRegionInfo>(numRegions);
+ byte[] start = new byte[16];
+ byte[] end = new byte[16];
+ Bytes.putInt(start, 0, numRegions << 1);
+ Bytes.putInt(end, 0, (numRegions << 1) + 1);
+ for (int i = 0; i < numRegions; i++) {
+ TableName tableName = getTableName(sn);
+ HRegionInfo hri = new HRegionInfo(
+ tableName, start, end, false,
+ regionId++);
+ regions.add(hri);
+ }
+ return regions;
+ }
+
+ private static List<ServerName> generateServers(int numServers) {
+ List<ServerName> servers = new ArrayList<ServerName>(numServers);
+ for (int i = 0; i < numServers; i++) {
+ String host = "server" + rand.nextInt(100000);
+ int port = rand.nextInt(60000);
+ servers.add(ServerName.valueOf(host, port, -1));
+ }
+ return servers;
+ }
+
+ /**
+ * Construct group info, with each group having at least one server.
+ *
+ * @param servers the servers
+ * @param groups the groups
+ * @return the map
+ */
+ private static Map<String, RSGroupInfo> constructGroupInfo(
+ List<ServerName> servers, String[] groups) {
+ assertTrue(servers != null);
+ assertTrue(servers.size() >= groups.length);
+ int index = 0;
+ Map<String, RSGroupInfo> groupMap = new HashMap<String, RSGroupInfo>();
+ for (String grpName : groups) {
+ RSGroupInfo RSGroupInfo = new RSGroupInfo(grpName);
+ RSGroupInfo.addServer(servers.get(index).getHostPort());
+ groupMap.put(grpName, RSGroupInfo);
+ index++;
+ }
+ while (index < servers.size()) {
+ int grpIndex = rand.nextInt(groups.length);
+ groupMap.get(groups[grpIndex]).addServer(
+ servers.get(index).getHostPort());
+ index++;
+ }
+ return groupMap;
+ }
+
+ /**
+ * Construct table descriptors evenly distributed between the groups.
+ *
+ * @return the list
+ */
+ private static List<HTableDescriptor> constructTableDesc() {
+ List<HTableDescriptor> tds = Lists.newArrayList();
+ int index = rand.nextInt(groups.length);
+ for (int i = 0; i < tables.length; i++) {
+ HTableDescriptor htd = new HTableDescriptor(tables[i]);
+ int grpIndex = (i + index) % groups.length ;
+ String groupName = groups[grpIndex];
+ tableMap.put(tables[i], groupName);
+ tds.add(htd);
+ }
+ return tds;
+ }
+
+ private static MasterServices getMockedMaster() throws IOException {
+ TableDescriptors tds = Mockito.mock(TableDescriptors.class);
+ Mockito.when(tds.get(tables[0])).thenReturn(tableDescs.get(0));
+ Mockito.when(tds.get(tables[1])).thenReturn(tableDescs.get(1));
+ Mockito.when(tds.get(tables[2])).thenReturn(tableDescs.get(2));
+ Mockito.when(tds.get(tables[3])).thenReturn(tableDescs.get(3));
+ MasterServices services = Mockito.mock(HMaster.class);
+ Mockito.when(services.getTableDescriptors()).thenReturn(tds);
+ AssignmentManager am = Mockito.mock(AssignmentManager.class);
+ Mockito.when(services.getAssignmentManager()).thenReturn(am);
+ return services;
+ }
+
+ private static RSGroupInfoManager getMockedGroupInfoManager() throws IOException {
+ RSGroupInfoManager gm = Mockito.mock(RSGroupInfoManager.class);
+ Mockito.when(gm.getRSGroup(groups[0])).thenReturn(
+ groupMap.get(groups[0]));
+ Mockito.when(gm.getRSGroup(groups[1])).thenReturn(
+ groupMap.get(groups[1]));
+ Mockito.when(gm.getRSGroup(groups[2])).thenReturn(
+ groupMap.get(groups[2]));
+ Mockito.when(gm.getRSGroup(groups[3])).thenReturn(
+ groupMap.get(groups[3]));
+ Mockito.when(gm.listRSGroups()).thenReturn(
+ Lists.newLinkedList(groupMap.values()));
+ Mockito.when(gm.isOnline()).thenReturn(true);
+ Mockito.when(gm.getRSGroupOfTable(Mockito.any(TableName.class)))
+ .thenAnswer(new Answer<String>() {
+ @Override
+ public String answer(InvocationOnMock invocation) throws Throwable {
+ return tableMap.get(invocation.getArguments()[0]);
+ }
+ });
+ return gm;
+ }
+
+ private TableName getTableName(ServerName sn) throws IOException {
+ TableName tableName = null;
+ RSGroupInfoManager gm = getMockedGroupInfoManager();
+ RSGroupInfo groupOfServer = null;
+ for(RSGroupInfo gInfo : gm.listRSGroups()){
+ if(gInfo.containsServer(sn.getHostPort())){
+ groupOfServer = gInfo;
+ break;
+ }
+ }
+
+ for(HTableDescriptor desc : tableDescs){
+ if(gm.getRSGroupOfTable(desc.getTableName()).endsWith(groupOfServer.getName())){
+ tableName = desc.getTableName();
+ }
+ }
+ return tableName;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/a0a7f6f4/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroups.java
----------------------------------------------------------------------
diff --git a/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroups.java b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroups.java
new file mode 100644
index 0000000..34add63
--- /dev/null
+++ b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroups.java
@@ -0,0 +1,287 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.rsgroup;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+import com.google.common.net.HostAndPort;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.NamespaceDescriptor;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.Waiter.Predicate;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.ServerManager;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@Category({MediumTests.class})
+public class TestRSGroups extends TestRSGroupsBase {
+ protected static final Log LOG = LogFactory.getLog(TestRSGroups.class);
+ private static HMaster master;
+ private static boolean init = false;
+ private static RSGroupAdminEndpoint RSGroupAdminEndpoint;
+
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ TEST_UTIL = new HBaseTestingUtility();
+ TEST_UTIL.getConfiguration().set(
+ HConstants.HBASE_MASTER_LOADBALANCER_CLASS,
+ RSGroupBasedLoadBalancer.class.getName());
+ TEST_UTIL.getConfiguration().set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
+ RSGroupAdminEndpoint.class.getName());
+ TEST_UTIL.getConfiguration().setBoolean(
+ HConstants.ZOOKEEPER_USEMULTI,
+ true);
+ TEST_UTIL.startMiniCluster(NUM_SLAVES_BASE);
+ TEST_UTIL.getConfiguration().set(
+ ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART,
+ ""+NUM_SLAVES_BASE);
+
+ admin = TEST_UTIL.getHBaseAdmin();
+ cluster = TEST_UTIL.getHBaseCluster();
+ master = ((MiniHBaseCluster)cluster).getMaster();
+
+ //wait for balancer to come online
+ TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate<Exception>() {
+ @Override
+ public boolean evaluate() throws Exception {
+ return master.isInitialized() &&
+ ((RSGroupBasedLoadBalancer) master.getLoadBalancer()).isOnline();
+ }
+ });
+ admin.setBalancerRunning(false,true);
+ rsGroupAdmin = new VerifyingRSGroupAdminClient(rsGroupAdmin.newClient(TEST_UTIL.getConnection()),
+ TEST_UTIL.getConfiguration());
+ RSGroupAdminEndpoint =
+ master.getMasterCoprocessorHost().findCoprocessors(RSGroupAdminEndpoint.class).get(0);
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @Before
+ public void beforeMethod() throws Exception {
+ if(!init) {
+ init = true;
+ afterMethod();
+ }
+
+ }
+
+ @After
+ public void afterMethod() throws Exception {
+ deleteTableIfNecessary();
+ deleteNamespaceIfNecessary();
+ deleteGroups();
+
+ int missing = NUM_SLAVES_BASE - getNumServers();
+ LOG.info("Restoring servers: "+missing);
+ for(int i=0; i<missing; i++) {
+ ((MiniHBaseCluster)cluster).startRegionServer();
+ }
+
+ rsGroupAdmin.addRSGroup("master");
+ ServerName masterServerName =
+ ((MiniHBaseCluster)cluster).getMaster().getServerName();
+
+ try {
+ rsGroupAdmin.moveServers(
+ Sets.newHashSet(masterServerName.getHostPort()),
+ "master");
+ } catch (Exception ex) {
+ // ignore
+ }
+ TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate<Exception>() {
+ @Override
+ public boolean evaluate() throws Exception {
+ LOG.info("Waiting for cleanup to finish " + rsGroupAdmin.listRSGroups());
+ //Might be greater since moving servers back to default
+ //is after starting a server
+
+ return rsGroupAdmin.getRSGroupInfo(RSGroupInfo.DEFAULT_GROUP).getServers().size()
+ == NUM_SLAVES_BASE;
+ }
+ });
+ }
+
+ @Test
+ public void testBasicStartUp() throws IOException {
+ RSGroupInfo defaultInfo = rsGroupAdmin.getRSGroupInfo(RSGroupInfo.DEFAULT_GROUP);
+ assertEquals(4, defaultInfo.getServers().size());
+ // Assignment of root and meta regions.
+ int count = master.getAssignmentManager().getRegionStates().getRegionAssignments().size();
+ //3 meta,namespace, group
+ assertEquals(3, count);
+ }
+
+ @Test
+ public void testNamespaceCreateAndAssign() throws Exception {
+ LOG.info("testNamespaceCreateAndAssign");
+ String nsName = tablePrefix+"_foo";
+ final TableName tableName = TableName.valueOf(nsName, tablePrefix + "_testCreateAndAssign");
+ RSGroupInfo appInfo = addGroup(rsGroupAdmin, "appInfo", 1);
+ admin.createNamespace(NamespaceDescriptor.create(nsName)
+ .addConfiguration(RSGroupInfo.NAMESPACEDESC_PROP_GROUP, "appInfo").build());
+ final HTableDescriptor desc = new HTableDescriptor(tableName);
+ desc.addFamily(new HColumnDescriptor("f"));
+ admin.createTable(desc);
+ //wait for created table to be assigned
+ TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate<Exception>() {
+ @Override
+ public boolean evaluate() throws Exception {
+ return getTableRegionMap().get(desc.getTableName()) != null;
+ }
+ });
+ ServerName targetServer =
+ ServerName.parseServerName(appInfo.getServers().iterator().next().toString());
+ AdminProtos.AdminService.BlockingInterface rs = admin.getConnection().getAdmin(targetServer);
+ //verify it was assigned to the right group
+ Assert.assertEquals(1, ProtobufUtil.getOnlineRegions(rs).size());
+ }
+
+ @Test
+ public void testDefaultNamespaceCreateAndAssign() throws Exception {
+ LOG.info("testDefaultNamespaceCreateAndAssign");
+ final byte[] tableName = Bytes.toBytes(tablePrefix + "_testCreateAndAssign");
+ admin.modifyNamespace(NamespaceDescriptor.create("default")
+ .addConfiguration(RSGroupInfo.NAMESPACEDESC_PROP_GROUP, "default").build());
+ final HTableDescriptor desc = new HTableDescriptor(tableName);
+ desc.addFamily(new HColumnDescriptor("f"));
+ admin.createTable(desc);
+ //wait for created table to be assigned
+ TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate<Exception>() {
+ @Override
+ public boolean evaluate() throws Exception {
+ return getTableRegionMap().get(desc.getTableName()) != null;
+ }
+ });
+ }
+
+ @Test
+ public void testNamespaceConstraint() throws Exception {
+ String nsName = tablePrefix+"_foo";
+ String groupName = tablePrefix+"_foo";
+ LOG.info("testNamespaceConstraint");
+ rsGroupAdmin.addRSGroup(groupName);
+ admin.createNamespace(NamespaceDescriptor.create(nsName)
+ .addConfiguration(RSGroupInfo.NAMESPACEDESC_PROP_GROUP, groupName)
+ .build());
+ //test removing a referenced group
+ try {
+ rsGroupAdmin.removeRSGroup(groupName);
+ fail("Expected a constraint exception");
+ } catch (IOException ex) {
+ }
+ //test modify group
+ //changing with the same name is fine
+ admin.modifyNamespace(
+ NamespaceDescriptor.create(nsName)
+ .addConfiguration(RSGroupInfo.NAMESPACEDESC_PROP_GROUP, groupName)
+ .build());
+ String anotherGroup = tablePrefix+"_anotherGroup";
+ rsGroupAdmin.addRSGroup(anotherGroup);
+ //test add non-existent group
+ admin.deleteNamespace(nsName);
+ rsGroupAdmin.removeRSGroup(groupName);
+ try {
+ admin.createNamespace(NamespaceDescriptor.create(nsName)
+ .addConfiguration(RSGroupInfo.NAMESPACEDESC_PROP_GROUP, "foo")
+ .build());
+ fail("Expected a constraint exception");
+ } catch (IOException ex) {
+ }
+ }
+
+ @Test
+ public void testGroupInfoMultiAccessing() throws Exception {
+ RSGroupInfoManager manager = RSGroupAdminEndpoint.getGroupInfoManager();
+ final RSGroupInfo defaultGroup = manager.getRSGroup("default");
+ // getRSGroup updates default group's server list
+ // this process must not affect other threads iterating the list
+ Iterator<HostAndPort> it = defaultGroup.getServers().iterator();
+ manager.getRSGroup("default");
+ it.next();
+ }
+
+ @Test
+ public void testMisplacedRegions() throws Exception {
+ final TableName tableName = TableName.valueOf(tablePrefix+"_testMisplacedRegions");
+ LOG.info("testMisplacedRegions");
+
+ final RSGroupInfo RSGroupInfo = addGroup(rsGroupAdmin, "testMisplacedRegions", 1);
+
+ TEST_UTIL.createMultiRegionTable(tableName, new byte[]{'f'}, 15);
+ TEST_UTIL.waitUntilAllRegionsAssigned(tableName);
+
+ RSGroupAdminEndpoint.getGroupInfoManager()
+ .moveTables(Sets.newHashSet(tableName), RSGroupInfo.getName());
+
+ assertTrue(rsGroupAdmin.balanceRSGroup(RSGroupInfo.getName()));
+
+ TEST_UTIL.waitFor(60000, new Predicate<Exception>() {
+ @Override
+ public boolean evaluate() throws Exception {
+ ServerName serverName =
+ ServerName.valueOf(RSGroupInfo.getServers().iterator().next().toString(), 1);
+ return admin.getConnection().getAdmin()
+ .getOnlineRegions(serverName).size() == 15;
+ }
+ });
+ }
+}