You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2015/08/14 19:07:22 UTC
[2/6] hbase git commit: HBASE-6721 RegionServer Group based
Assignment (Francis Liu)
http://git-wip-us.apache.org/repos/asf/hbase/blob/66e16163/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index aaaef80..2e87d7b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -41,8 +41,10 @@ import java.util.concurrent.atomic.AtomicReference;
import javax.management.ObjectName;
+import com.google.common.collect.Sets;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HostPort;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -76,6 +78,7 @@ import org.apache.hadoop.hbase.client.MetaScanner;
import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitorBase;
import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.constraint.ConstraintException;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.errorhandling.ForeignException;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
@@ -83,6 +86,9 @@ import org.apache.hadoop.hbase.exceptions.MergeRegionException;
import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.executor.ExecutorType;
+import org.apache.hadoop.hbase.group.GroupAdminServer;
+import org.apache.hadoop.hbase.group.GroupInfo;
+import org.apache.hadoop.hbase.group.GroupableBalancer;
import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
@@ -121,8 +127,12 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.Re
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AddColumnRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AddColumnResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AddGroupRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AddGroupResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AssignRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AssignRegionResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.BalanceGroupRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.BalanceGroupResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.BalanceRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.BalanceResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateNamespaceRequest;
@@ -151,6 +161,12 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetClusterStatusR
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetClusterStatusResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetGroupInfoOfServerRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetGroupInfoOfServerResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetGroupInfoOfTableRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetGroupInfoOfTableResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetGroupInfoRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetGroupInfoResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetSchemaAlterStatusRequest;
@@ -171,6 +187,8 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsRestoreSnapshot
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsRestoreSnapshotDoneResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListGroupInfosRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListGroupInfosResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest;
@@ -185,8 +203,14 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyTableReques
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyTableResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MoveRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MoveRegionResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MoveServersRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MoveServersResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MoveTablesRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MoveTablesResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.OfflineRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.OfflineRegionResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RemoveGroupRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RemoveGroupResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RestoreSnapshotRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RestoreSnapshotResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RunCatalogScanRequest;
@@ -361,6 +385,8 @@ MasterServices, Server {
private LoadBalancerTracker loadBalancerTracker;
// master address tracker
private MasterAddressTracker masterAddressTracker;
+ // group admin apis
+ private GroupAdminServer groupAdminServer;
// RPC server for the HMaster
private final RpcServerInterface rpcServer;
@@ -968,6 +994,11 @@ MasterServices, Server {
this.initializationBeforeMetaAssignment = true;
+ if (balancer instanceof GroupableBalancer) {
+ groupAdminServer = new GroupAdminServer(this);
+ ((GroupableBalancer)balancer).setGroupInfoManager(groupAdminServer.getGroupInfoManager());
+ }
+
//initialize load balancer
this.balancer.setClusterStatus(getClusterStatus());
this.balancer.setMasterServices(this);
@@ -1795,11 +1826,17 @@ MasterServices, Server {
final byte[] destServerName) throws HBaseIOException {
RegionState regionState = assignmentManager.getRegionStates().
getRegionState(Bytes.toString(encodedRegionName));
- if (regionState == null) {
+
+ HRegionInfo hri;
+ if (Bytes.toString(encodedRegionName)
+ .equals(HRegionInfo.FIRST_META_REGIONINFO.getEncodedName())) {
+ hri = HRegionInfo.FIRST_META_REGIONINFO;
+ } else if (regionState != null) {
+ hri = regionState.getRegion();
+ } else {
throw new UnknownRegionException(Bytes.toStringBinary(encodedRegionName));
}
- HRegionInfo hri = regionState.getRegion();
ServerName dest;
if (destServerName == null || destServerName.length == 0) {
LOG.info("Passed destination servername is null/empty so " +
@@ -1807,8 +1844,17 @@ MasterServices, Server {
final List<ServerName> destServers = this.serverManager.createDestinationServersList(
regionState.getServerName());
dest = balancer.randomAssignment(hri, destServers);
+ if (dest == null) {
+ LOG.debug("Unable to determine a plan to assign " + hri);
+ return;
+ }
} else {
- dest = ServerName.valueOf(Bytes.toString(destServerName));
+ ServerName candidate = ServerName.valueOf(Bytes.toString(destServerName));
+ dest = balancer.randomAssignment(hri, Lists.newArrayList(candidate));
+ if (dest == null) {
+ LOG.debug("Unable to determine a plan to assign " + hri);
+ return;
+ }
if (dest.equals(regionState.getServerName())) {
LOG.debug("Skipping move of region " + hri.getRegionNameAsString()
+ " because region already assigned to the same server " + dest + ".");
@@ -2143,7 +2189,7 @@ MasterServices, Server {
throws ServiceException {
try {
addColumn(ProtobufUtil.toTableName(req.getTableName()),
- HColumnDescriptor.convert(req.getColumnFamilies()));
+ HColumnDescriptor.convert(req.getColumnFamilies()));
} catch (IOException ioe) {
throw new ServiceException(ioe);
}
@@ -2174,7 +2220,7 @@ MasterServices, Server {
throws ServiceException {
try {
modifyColumn(ProtobufUtil.toTableName(req.getTableName()),
- HColumnDescriptor.convert(req.getColumnFamilies()));
+ HColumnDescriptor.convert(req.getColumnFamilies()));
} catch (IOException ioe) {
throw new ServiceException(ioe);
}
@@ -2781,7 +2827,18 @@ MasterServices, Server {
}
Pair<HRegionInfo, ServerName> pair =
MetaReader.getRegion(this.catalogTracker, regionName);
- if (pair == null) throw new UnknownRegionException(Bytes.toString(regionName));
+ if (Bytes.equals(HRegionInfo.FIRST_META_REGIONINFO.getRegionName(),regionName)) {
+ try {
+ pair = new Pair<HRegionInfo, ServerName>(HRegionInfo.FIRST_META_REGIONINFO,
+ this.catalogTracker.getMetaLocation());
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ }
+ if (pair == null) {
+ throw new UnknownRegionException(Bytes.toString(regionName));
+ }
+
HRegionInfo hri = pair.getFirst();
if (cpHost != null) {
if (cpHost.preUnassign(hri, force)) {
@@ -3413,6 +3470,12 @@ MasterServices, Server {
@Override
public void createNamespace(NamespaceDescriptor descriptor) throws IOException {
TableName.isLegalNamespaceName(Bytes.toBytes(descriptor.getName()));
+
+ String group = descriptor.getConfigurationValue(GroupInfo.NAMESPACEDESC_PROP_GROUP);
+ if(group != null && groupAdminServer.getGroupInfo(group) == null) {
+ throw new ConstraintException("Region server group "+group+" does not exit");
+ }
+
if (cpHost != null) {
if (cpHost.preCreateNamespace(descriptor)) {
return;
@@ -3428,6 +3491,12 @@ MasterServices, Server {
@Override
public void modifyNamespace(NamespaceDescriptor descriptor) throws IOException {
TableName.isLegalNamespaceName(Bytes.toBytes(descriptor.getName()));
+
+ String group = descriptor.getConfigurationValue(GroupInfo.NAMESPACEDESC_PROP_GROUP);
+ if(group != null && groupAdminServer.getGroupInfo(group) == null) {
+ throw new ConstraintException("Region server group "+group+" does not exit");
+ }
+
if (cpHost != null) {
if (cpHost.preModifyNamespace(descriptor)) {
return;
@@ -3573,7 +3642,7 @@ MasterServices, Server {
.getDefaultLoadBalancerClass().getName());
}
- /**
+ /**
* Returns the security capabilities in effect on the cluster
*/
@Override
@@ -3612,4 +3681,158 @@ MasterServices, Server {
}
return response.build();
}
+
+ @Override
+ public LoadBalancer getLoadBalancer() {
+ return balancer;
+ }
+
+ @Override
+ public GroupAdminServer getGroupAdminServer() {
+ return groupAdminServer;
+ }
+
+ @Override
+ public GetGroupInfoResponse getGroupInfo(RpcController controller, GetGroupInfoRequest request) throws ServiceException {
+ MasterProtos.GetGroupInfoResponse response = null;
+ try {
+ MasterProtos.GetGroupInfoResponse.Builder builder =
+ MasterProtos.GetGroupInfoResponse.newBuilder();
+ GroupInfo groupInfo = groupAdminServer.getGroupInfo(request.getGroupName());
+ if(groupInfo != null) {
+ builder.setGroupInfo(ProtobufUtil.toProtoGroupInfo(groupInfo));
+ }
+ response = builder.build();
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ return response;
+ }
+
+ @Override
+ public GetGroupInfoOfTableResponse getGroupInfoOfTable(RpcController controller,
+ GetGroupInfoOfTableRequest request) throws ServiceException {
+ MasterProtos.GetGroupInfoOfTableResponse response = null;
+ try {
+ MasterProtos.GetGroupInfoOfTableResponse.Builder builder =
+ MasterProtos.GetGroupInfoOfTableResponse.newBuilder();
+ GroupInfo groupInfo =
+ groupAdminServer.getGroupInfoOfTable(ProtobufUtil.toTableName(request.getTableName()));
+ response = builder.setGroupInfo(ProtobufUtil.toProtoGroupInfo(groupInfo)).build();
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ return response;
+ }
+
+ @Override
+ public MoveServersResponse moveServers(RpcController controller, MoveServersRequest request) throws ServiceException {
+ MasterProtos.MoveServersResponse response = null;
+ try {
+ MasterProtos.MoveServersResponse.Builder builder =
+ MasterProtos.MoveServersResponse.newBuilder();
+ Set<HostPort> hostPorts = Sets.newHashSet();
+ for(HBaseProtos.HostPort el: request.getServersList()) {
+ hostPorts.add(new HostPort(el.getHostName(), el.getPort()));
+ }
+ groupAdminServer.moveServers(hostPorts, request.getTargetGroup());
+ response = builder.build();
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ return response;
+ }
+
+ @Override
+ public MoveTablesResponse moveTables(RpcController controller, MoveTablesRequest request) throws ServiceException {
+ MasterProtos.MoveTablesResponse response = null;
+ try {
+ MasterProtos.MoveTablesResponse.Builder builder =
+ MasterProtos.MoveTablesResponse.newBuilder();
+ Set<TableName> tables = new HashSet<TableName>(request.getTableNameList().size());
+ for(HBaseProtos.TableName tableName: request.getTableNameList()) {
+ tables.add(ProtobufUtil.toTableName(tableName));
+ }
+ groupAdminServer.moveTables(tables, request.getTargetGroup());
+ response = builder.build();
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ return response;
+ }
+
+ @Override
+ public AddGroupResponse addGroup(RpcController controller, AddGroupRequest request) throws ServiceException {
+ MasterProtos.AddGroupResponse response = null;
+ try {
+ MasterProtos.AddGroupResponse.Builder builder =
+ MasterProtos.AddGroupResponse.newBuilder();
+ groupAdminServer.addGroup(request.getGroupName());
+ response = builder.build();
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ return response;
+ }
+
+ @Override
+ public RemoveGroupResponse removeGroup(RpcController controller, RemoveGroupRequest request) throws ServiceException {
+ MasterProtos.RemoveGroupResponse response = null;
+ try {
+ MasterProtos.RemoveGroupResponse.Builder builder =
+ MasterProtos.RemoveGroupResponse.newBuilder();
+ groupAdminServer.removeGroup(request.getGroupName());
+ response = builder.build();
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ return response;
+ }
+
+ @Override
+ public BalanceGroupResponse balanceGroup(RpcController controller, BalanceGroupRequest request) throws ServiceException {
+ MasterProtos.BalanceGroupResponse response = null;
+ try {
+ MasterProtos.BalanceGroupResponse.Builder builder =
+ MasterProtos.BalanceGroupResponse.newBuilder();
+ builder.setBalanceRan(groupAdminServer.balanceGroup(request.getGroupName()));
+ response = builder.build();
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ return response;
+ }
+
+ @Override
+ public ListGroupInfosResponse listGroupInfos(RpcController controller,
+ ListGroupInfosRequest request) throws ServiceException {
+ MasterProtos.ListGroupInfosResponse response = null;
+ try {
+ MasterProtos.ListGroupInfosResponse.Builder builder =
+ MasterProtos.ListGroupInfosResponse.newBuilder();
+ for(GroupInfo groupInfo: groupAdminServer.listGroups()) {
+ builder.addGroupInfo(ProtobufUtil.toProtoGroupInfo(groupInfo));
+ }
+ response = builder.build();
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ return response;
+ }
+
+ @Override
+ public GetGroupInfoOfServerResponse getGroupInfoOfServer(RpcController controller,
+ GetGroupInfoOfServerRequest request) throws ServiceException {
+ MasterProtos.GetGroupInfoOfServerResponse response = null;
+ try {
+ MasterProtos.GetGroupInfoOfServerResponse.Builder builder =
+ MasterProtos.GetGroupInfoOfServerResponse.newBuilder();
+ GroupInfo groupInfo = groupAdminServer.getGroupOfServer(
+ new HostPort(request.getServer().getHostName(), request.getServer().getPort()));
+ response = builder.setGroupInfo(ProtobufUtil.toProtoGroupInfo(groupInfo)).build();
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ return response;
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/66e16163/hbase-server/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java
index e24d745..abd0268 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java
@@ -48,6 +48,9 @@ import org.apache.hadoop.hbase.Stoppable;
@InterfaceAudience.Private
public interface LoadBalancer extends Configurable, Stoppable {
+ //used to signal to the caller that the region(s) cannot be assigned
+ ServerName BOGUS_SERVER_NAME = ServerName.parseServerName("127.0.0.1,1,1");
+
/**
* Set the current cluster status. This allows a LoadBalancer to map host name to a server
* @param st
http://git-wip-us.apache.org/repos/asf/hbase/blob/66e16163/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
index a9bb081..73faf3c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.master;
import java.io.IOException;
import java.util.List;
+import java.util.Set;
import org.apache.commons.lang.ClassUtils;
import org.apache.commons.logging.Log;
@@ -30,6 +31,7 @@ import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.HostPort;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
@@ -850,6 +852,116 @@ public class MasterCoprocessorHost
}
});
}
+
+ public void preMoveServers(final Set<HostPort> servers, final String targetGroup)
+ throws IOException {
+ execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ @Override
+ public void call(MasterObserver oserver,
+ ObserverContext<MasterCoprocessorEnvironment> ctx) throws IOException {
+ oserver.preMoveServers(ctx, servers, targetGroup);
+ }
+ });
+ }
+
+ public void postMoveServers(final Set<HostPort> servers, final String targetGroup)
+ throws IOException {
+ execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ @Override
+ public void call(MasterObserver oserver,
+ ObserverContext<MasterCoprocessorEnvironment> ctx) throws IOException {
+ oserver.postMoveServers(ctx, servers, targetGroup);
+ }
+ });
+ }
+
+ public void preMoveTables(final Set<TableName> tables, final String targetGroup)
+ throws IOException {
+ execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ @Override
+ public void call(MasterObserver oserver,
+ ObserverContext<MasterCoprocessorEnvironment> ctx) throws IOException {
+ oserver.preMoveTables(ctx, tables, targetGroup);
+ }
+ });
+ }
+
+ public void postMoveTables(final Set<TableName> tables, final String targetGroup)
+ throws IOException {
+ execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ @Override
+ public void call(MasterObserver oserver,
+ ObserverContext<MasterCoprocessorEnvironment> ctx) throws IOException {
+ oserver.postMoveTables(ctx, tables, targetGroup);
+ }
+ });
+ }
+
+ public void preAddGroup(final String name)
+ throws IOException {
+ execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ @Override
+ public void call(MasterObserver oserver,
+ ObserverContext<MasterCoprocessorEnvironment> ctx) throws IOException {
+ oserver.preAddGroup(ctx, name);
+ }
+ });
+ }
+
+ public void postAddGroup(final String name)
+ throws IOException {
+ execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ @Override
+ public void call(MasterObserver oserver,
+ ObserverContext<MasterCoprocessorEnvironment> ctx) throws IOException {
+ oserver.postAddGroup(ctx, name);
+ }
+ });
+ }
+
+ public void preRemoveGroup(final String name)
+ throws IOException {
+ execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ @Override
+ public void call(MasterObserver oserver,
+ ObserverContext<MasterCoprocessorEnvironment> ctx) throws IOException {
+ oserver.preRemoveGroup(ctx, name);
+ }
+ });
+ }
+
+ public void postRemoveGroup(final String name)
+ throws IOException {
+ execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ @Override
+ public void call(MasterObserver oserver,
+ ObserverContext<MasterCoprocessorEnvironment> ctx) throws IOException {
+ oserver.postRemoveGroup(ctx, name);
+ }
+ });
+ }
+
+ public void preBalanceGroup(final String name)
+ throws IOException {
+ execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ @Override
+ public void call(MasterObserver oserver,
+ ObserverContext<MasterCoprocessorEnvironment> ctx) throws IOException {
+ oserver.preBalanceGroup(ctx, name);
+ }
+ });
+ }
+
+ public void postBalanceGroup(final String name, final boolean balanceRan)
+ throws IOException {
+ execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ @Override
+ public void call(MasterObserver oserver,
+ ObserverContext<MasterCoprocessorEnvironment> ctx) throws IOException {
+ oserver.postBalanceGroup(ctx, name, balanceRan);
+ }
+ });
+ }
private static abstract class CoprocessorOperation
extends ObserverContext<MasterCoprocessorEnvironment> {
http://git-wip-us.apache.org/repos/asf/hbase/blob/66e16163/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
index c402758..8844ea5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.TableNotDisabledException;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.executor.ExecutorService;
+import org.apache.hadoop.hbase.group.GroupAdminServer;
import com.google.protobuf.Service;
@@ -250,4 +251,14 @@ public interface MasterServices extends Server {
* @throws IOException
*/
public List<TableName> listTableNamesByNamespace(String name) throws IOException;
+
+ /**
+ * @return load balancer
+ */
+ public LoadBalancer getLoadBalancer();
+
+ /**
+ * @return load balancer
+ */
+ public GroupAdminServer getGroupAdminServer();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/66e16163/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java
index 24d8c71..868818f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.master.handler;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.security.PrivilegedExceptionAction;
+import java.security.acl.Group;
import java.util.List;
import org.apache.commons.logging.Log;
@@ -40,6 +41,7 @@ import org.apache.hadoop.hbase.catalog.MetaEditor;
import org.apache.hadoop.hbase.catalog.MetaReader;
import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.executor.EventType;
+import org.apache.hadoop.hbase.group.GroupAdminServer;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.master.AssignmentManager;
import org.apache.hadoop.hbase.master.HMaster;
@@ -64,6 +66,7 @@ public class CreateTableHandler extends EventHandler {
protected final MasterFileSystem fileSystemManager;
protected final HTableDescriptor hTableDescriptor;
protected final Configuration conf;
+ private final MasterServices masterServices;
private final AssignmentManager assignmentManager;
private final CatalogTracker catalogTracker;
private final TableLockManager tableLockManager;
@@ -83,6 +86,7 @@ public class CreateTableHandler extends EventHandler {
this.catalogTracker = masterServices.getCatalogTracker();
this.assignmentManager = masterServices.getAssignmentManager();
this.tableLockManager = masterServices.getTableLockManager();
+ this.masterServices = masterServices;
this.tableLock = this.tableLockManager.writeLock(this.hTableDescriptor.getTableName()
, EventType.C_M_CREATE_TABLE.toString());
@@ -140,6 +144,15 @@ public class CreateTableHandler extends EventHandler {
throw new IOException("Unable to ensure that the table will be" +
" enabling because of a ZooKeeper issue", e);
}
+
+ //prepare table's group affiliation
+ //If master is not initialized and a create table is spawned then it is
+ //a special table and group affilition should be taken care of explicitly
+ GroupAdminServer groupAdminServer = masterServices.getGroupAdminServer();
+ if (groupAdminServer != null && masterServices.isInitialized()) {
+ groupAdminServer.prepareGroupForTable(hTableDescriptor);
+ }
+
success = true;
} finally {
if (!success) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/66e16163/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java
index 474da0a..5164ad7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java
@@ -134,6 +134,12 @@ public class DeleteTableHandler extends TableEventHandler {
// 9. Clean up any remaining rows for this table
cleanAnyRemainingRows();
+
+ // Remove group affiliation
+ if (masterServices.getGroupAdminServer() != null) {
+ LOG.debug("Removing " + tableName + " from group.");
+ masterServices.getGroupAdminServer().cleanupGroupForTable(tableName);
+ }
}
if (cpHost != null) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/66e16163/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
index 94d716b..cf551f1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.HostPort;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.KeyValueUtil;
@@ -419,6 +420,7 @@ public class AccessController extends BaseMasterAndRegionObserver
* @throws IOException if obtaining the current user fails
* @throws AccessDeniedException if user has no authorization
*/
+
private void requirePermission(String request, TableName tableName, byte[] family,
byte[] qualifier, Action... permissions) throws IOException {
User user = getActiveUser();
@@ -2482,4 +2484,34 @@ public class AccessController extends BaseMasterAndRegionObserver
public void postReplicateLogEntries(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
List<WALEntry> entries, CellScanner cells) throws IOException {
}
+
+ @Override
+ public void preMoveServers(ObserverContext<MasterCoprocessorEnvironment> ctx,
+ Set<HostPort> servers, String targetGroup) throws IOException {
+ requirePermission("moveServers", Action.ADMIN);
+ }
+
+ @Override
+ public void preMoveTables(ObserverContext<MasterCoprocessorEnvironment> ctx,
+ Set<TableName> tables, String targetGroup) throws IOException {
+ requirePermission("moveTables", Action.ADMIN);
+ }
+
+ @Override
+ public void preAddGroup(ObserverContext<MasterCoprocessorEnvironment> ctx,
+ String name) throws IOException {
+ requirePermission("addGroup", Action.ADMIN);
+ }
+
+ @Override
+ public void preRemoveGroup(ObserverContext<MasterCoprocessorEnvironment> ctx,
+ String name) throws IOException {
+ requirePermission("removeGroup", Action.ADMIN);
+ }
+
+ @Override
+ public void preBalanceGroup(ObserverContext<MasterCoprocessorEnvironment> ctx,
+ String groupName) throws IOException {
+ requirePermission("balanceGroup", Action.ADMIN);
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/66e16163/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterObserver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterObserver.java
index bbabf3a..8189ddb 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterObserver.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterObserver.java
@@ -29,6 +29,7 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
+import java.util.Set;
import java.util.concurrent.CountDownLatch;
import org.apache.commons.logging.Log;
@@ -990,6 +991,56 @@ public class TestMasterObserver {
public boolean wasGetTableDescriptorsCalled() {
return preGetTableDescriptorsCalled && postGetTableDescriptorsCalled;
}
+
+ @Override
+ public void preMoveServers(ObserverContext<MasterCoprocessorEnvironment> ctx,
+ Set<HostPort> servers, String targetGroup) throws IOException {
+ }
+
+ @Override
+ public void postMoveServers(ObserverContext<MasterCoprocessorEnvironment> ctx,
+ Set<HostPort> servers, String targetGroup) throws IOException {
+ }
+
+ @Override
+ public void preMoveTables(ObserverContext<MasterCoprocessorEnvironment> ctx,
+ Set<TableName> tables, String targetGroup) throws IOException {
+ }
+
+ @Override
+ public void postMoveTables(ObserverContext<MasterCoprocessorEnvironment> ctx,
+ Set<TableName> tables, String targetGroup) throws IOException {
+ }
+
+ @Override
+ public void preAddGroup(ObserverContext<MasterCoprocessorEnvironment> ctx,
+ String name) throws IOException {
+ }
+
+ @Override
+ public void postAddGroup(ObserverContext<MasterCoprocessorEnvironment> ctx,
+ String name) throws IOException {
+ }
+
+ @Override
+ public void preRemoveGroup(ObserverContext<MasterCoprocessorEnvironment> ctx,
+ String name) throws IOException {
+ }
+
+ @Override
+ public void postRemoveGroup(ObserverContext<MasterCoprocessorEnvironment> ctx,
+ String name) throws IOException {
+ }
+
+ @Override
+ public void preBalanceGroup(ObserverContext<MasterCoprocessorEnvironment> ctx,
+ String groupName) throws IOException {
+ }
+
+ @Override
+ public void postBalanceGroup(ObserverContext<MasterCoprocessorEnvironment> ctx,
+ String groupName, boolean balancerRan) throws IOException {
+ }
}
private static HBaseTestingUtility UTIL = new HBaseTestingUtility();
http://git-wip-us.apache.org/repos/asf/hbase/blob/66e16163/hbase-server/src/test/java/org/apache/hadoop/hbase/group/TestGroups.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/group/TestGroups.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/group/TestGroups.java
new file mode 100644
index 0000000..50a303f
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/group/TestGroups.java
@@ -0,0 +1,389 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.group;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.HostPort;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.NamespaceDescriptor;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.ServerManager;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@Category({MediumTests.class})
+public class TestGroups extends TestGroupsBase {
+ protected static final Log LOG = LogFactory.getLog(TestGroups.class);
+ private static HMaster master;
+ private static boolean init = false;
+
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ TEST_UTIL = new HBaseTestingUtility();
+ TEST_UTIL.getConfiguration().set(
+ HConstants.HBASE_MASTER_LOADBALANCER_CLASS,
+ GroupBasedLoadBalancer.class.getName());
+ TEST_UTIL.getConfiguration().setBoolean(
+ HConstants.ZOOKEEPER_USEMULTI,
+ true);
+ TEST_UTIL.startMiniCluster(NUM_SLAVES_BASE);
+ TEST_UTIL.getConfiguration().set(
+ ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART,
+ ""+NUM_SLAVES_BASE);
+
+ admin = TEST_UTIL.getHBaseAdmin();
+ cluster = TEST_UTIL.getHBaseCluster();
+ master = ((MiniHBaseCluster)cluster).getMaster();
+
+ //wait for balancer to come online
+ TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate<Exception>() {
+ @Override
+ public boolean evaluate() throws Exception {
+ return master.isInitialized() &&
+ ((GroupBasedLoadBalancer) master.getLoadBalancer()).isOnline();
+ }
+ });
+ admin.setBalancerRunning(false, true);
+ groupAdmin = new VerifyingGroupAdminClient(admin.getConnection().getGroupAdmin(),
+ TEST_UTIL.getConfiguration());
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @Before
+ public void beforeMethod() throws Exception {
+ if(!init) {
+ init = true;
+ afterMethod();
+ }
+
+ }
+
+ @After
+ public void afterMethod() throws Exception {
+ deleteTableIfNecessary();
+ deleteNamespaceIfNecessary();
+ deleteGroups();
+
+ int missing = NUM_SLAVES_BASE - cluster.getClusterStatus().getServers().size();
+ LOG.info("Restoring servers: "+missing);
+ for(int i=0; i<missing; i++) {
+ ((MiniHBaseCluster)cluster).startRegionServer();
+ }
+
+ TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate<Exception>() {
+ @Override
+ public boolean evaluate() throws Exception {
+ LOG.info("Waiting for cleanup to finish " + groupAdmin.listGroups());
+ //Might be greater since moving servers back to default
+ //is after starting a server
+
+ return groupAdmin.getGroupInfo(GroupInfo.DEFAULT_GROUP).getServers().size()
+ == NUM_SLAVES_BASE;
+ }
+ });
+ }
+
+ @Test
+ public void testJmx() throws Exception {
+ MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
+ Iterator<ObjectName> it = mBeanServer.queryNames(new ObjectName("hadoop:name=Group,service=Group"), null).iterator();
+ //verify it was loaded properly
+ assertEquals("hadoop:name=Group,service=Group", it.next().getCanonicalName());
+
+ final AtomicReference<HostPort> deadServer = new AtomicReference<HostPort>(null);
+
+ //We use mocks to simulate offline servers to avoid
+ //the complexity and overhead of killing servers
+ MasterServices mockMaster = Mockito.mock(MasterServices.class);
+ final ServerManager mockServerManager = Mockito.mock(ServerManager.class);
+ Mockito.when(mockMaster.getServerManager()).thenReturn(mockServerManager);
+ Mockito.when(mockServerManager.getOnlineServersList()).then(new Answer<List<ServerName>>() {
+ @Override
+ public List<ServerName> answer(InvocationOnMock invocation) throws Throwable {
+ GroupInfo groupInfo = groupAdmin.getGroupInfo(GroupInfo.DEFAULT_GROUP);
+ List<ServerName> finalList = Lists.newArrayList();
+ HostPort lastServer = groupInfo.getServers().last();
+ for (ServerName server: master.getServerManager().getOnlineServersList()) {
+ if (!server.getHostPort().equals(lastServer)) {
+ finalList.add(server);
+ }
+ }
+ deadServer.set(lastServer);
+ return finalList;
+ }
+ });
+ MXBean info = new MXBeanImpl(groupAdmin, mockMaster);
+
+
+ GroupInfo defaultGroup = groupAdmin.getGroupInfo(GroupInfo.DEFAULT_GROUP);
+ assertEquals(1, info.getGroups().size());
+ assertEquals(defaultGroup.getName(), info.getGroups().get(0).getName());
+ assertEquals(defaultGroup.getServers(), Sets.newTreeSet(info.getGroups().get(0).getServers()));
+ assertEquals(defaultGroup.getServers().headSet(deadServer.get()),
+ Sets.newTreeSet(info.getServersByGroup().get(GroupInfo.DEFAULT_GROUP)));
+
+ GroupInfo barGroup = addGroup(groupAdmin, "bar", 3);
+ TableName tableName1 = TableName.valueOf(tablePrefix+"_testJmx1");
+ TableName tableName2 = TableName.valueOf(tablePrefix+"_testJmx2");
+ TEST_UTIL.createTable(tableName1, Bytes.toBytes("f"));
+ TEST_UTIL.createTable(tableName2, Bytes.toBytes("f"));
+ groupAdmin.moveTables(Sets.newHashSet(tableName2), barGroup.getName());
+ assertEquals(2, info.getGroups().size());
+
+ int defaultIndex = -1;
+ int barIndex = -1;
+
+ for(int i=0; i<info.getGroups().size(); i++) {
+ MXBean.GroupInfoBean bean = info.getGroups().get(i);
+ if(bean.getName().equals(defaultGroup.getName())) {
+ defaultIndex = i;
+ }
+ else if(bean.getName().equals(barGroup.getName())) {
+ barIndex = i;
+ }
+ }
+
+ defaultGroup = groupAdmin.getGroupInfo(GroupInfo.DEFAULT_GROUP);
+ assertEquals(defaultGroup.getName(),
+ info.getGroups().get(defaultIndex).getName());
+ for(TableName entry: defaultGroup.getTables()) {
+ assertTrue(info.getGroups().get(defaultIndex).getTables().contains(entry));
+ }
+ assertEquals(defaultGroup.getTables().size(),
+ info.getGroups().get(defaultIndex).getTables().size());
+ assertEquals(defaultGroup.getServers(),
+ Sets.newTreeSet(info.getGroups().get(defaultIndex).getServers()));
+
+ barGroup = groupAdmin.getGroupInfo(barGroup.getName());
+ assertEquals(barGroup.getName(),
+ info.getGroups().get(barIndex).getName());
+ for(TableName entry: barGroup.getTables()) {
+ assertTrue(info.getGroups().get(barIndex).getTables().contains(entry));
+ }
+ assertEquals(barGroup.getTables().size(),
+ info.getGroups().get(barIndex).getTables().size());
+ assertEquals(barGroup.getServers(),
+ Sets.newTreeSet(info.getGroups().get(barIndex).getServers()));
+ }
+
+ @Test
+ public void testBasicStartUp() throws IOException {
+ GroupInfo defaultInfo = groupAdmin.getGroupInfo(GroupInfo.DEFAULT_GROUP);
+ assertEquals(4, defaultInfo.getServers().size());
+ // Assignment of root and meta regions.
+ int count = master.getAssignmentManager().getRegionStates().getRegionAssignments().size();
+ //3 meta,namespace, group
+ assertEquals(3, count);
+ }
+
+ @Test
+ public void testNamespaceCreateAndAssign() throws Exception {
+ LOG.info("testNamespaceCreateAndAssign");
+ String nsName = tablePrefix+"_foo";
+ final TableName tableName = TableName.valueOf(nsName, tablePrefix + "_testCreateAndAssign");
+ GroupInfo appInfo = addGroup(groupAdmin, "appInfo", 1);
+ admin.createNamespace(NamespaceDescriptor.create(nsName)
+ .addConfiguration(GroupInfo.NAMESPACEDESC_PROP_GROUP, "appInfo").build());
+ final HTableDescriptor desc = new HTableDescriptor(tableName);
+ desc.addFamily(new HColumnDescriptor("f"));
+ admin.createTable(desc);
+ //wait for created table to be assigned
+ TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate<Exception>() {
+ @Override
+ public boolean evaluate() throws Exception {
+ return getTableRegionMap().get(desc.getTableName()) != null;
+ }
+ });
+ ServerName targetServer =
+ ServerName.parseServerName(appInfo.getServers().iterator().next().toString());
+ AdminProtos.AdminService.BlockingInterface rs = admin.getConnection().getAdmin(targetServer);
+ //verify it was assigned to the right group
+ assertEquals(1, ProtobufUtil.getOnlineRegions(rs).size());
+ //verify prop was not stored as part of the schema
+ assertNull(admin.getTableDescriptor(tableName).getValue(GroupInfo.TABLEDESC_PROP_GROUP));
+ }
+
+ @Test
+ public void testDefaultNamespaceCreateAndAssign() throws Exception {
+ LOG.info("testDefaultNamespaceCreateAndAssign");
+ final byte[] tableName = Bytes.toBytes(tablePrefix + "_testCreateAndAssign");
+ admin.modifyNamespace(NamespaceDescriptor.create("default")
+ .addConfiguration(GroupInfo.NAMESPACEDESC_PROP_GROUP, "default").build());
+ final HTableDescriptor desc = new HTableDescriptor(tableName);
+ desc.addFamily(new HColumnDescriptor("f"));
+ admin.createTable(desc);
+ //wait for created table to be assigned
+ TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate<Exception>() {
+ @Override
+ public boolean evaluate() throws Exception {
+ return getTableRegionMap().get(desc.getTableName()) != null;
+ }
+ });
+ }
+
+ @Test
+ public void testNamespaceConstraint() throws Exception {
+ String nsName = tablePrefix+"_foo";
+ String groupName = tablePrefix+"_foo";
+ LOG.info("testNamespaceConstraint");
+ groupAdmin.addGroup(groupName);
+ admin.createNamespace(NamespaceDescriptor.create(nsName)
+ .addConfiguration(GroupInfo.NAMESPACEDESC_PROP_GROUP, groupName)
+ .build());
+ //test removing a referenced group
+ try {
+ groupAdmin.removeGroup(groupName);
+ fail("Expected a constraint exception");
+ } catch (IOException ex) {
+ }
+ //test modify group
+ //changing with the same name is fine
+ admin.modifyNamespace(
+ NamespaceDescriptor.create(nsName)
+ .addConfiguration(GroupInfo.NAMESPACEDESC_PROP_GROUP, groupName)
+ .build());
+ String anotherGroup = tablePrefix+"_anotherGroup";
+ groupAdmin.addGroup(anotherGroup);
+ //test add non-existent group
+ admin.deleteNamespace(nsName);
+ groupAdmin.removeGroup(groupName);
+ try {
+ admin.createNamespace(NamespaceDescriptor.create(nsName)
+ .addConfiguration(GroupInfo.NAMESPACEDESC_PROP_GROUP, "foo")
+ .build());
+ fail("Expected a constraint exception");
+ } catch (IOException ex) {
+ }
+ }
+
+ @Test
+ public void testGroupInfoMultiAccessing() throws Exception {
+ GroupInfoManager manager = master.getGroupAdminServer().getGroupInfoManager();
+ final GroupInfo defaultGroup = manager.getGroup("default");
+ // getGroup updates default group's server list
+ // this process must not affect other threads iterating the list
+ Iterator<HostPort> it = defaultGroup.getServers().iterator();
+ manager.getGroup("default");
+ it.next();
+ }
+
+ @Test
+ public void testTracker() throws IOException, InterruptedException {
+ LOG.info("testTracker");
+ ZooKeeperWatcher watcher =
+ new ZooKeeperWatcher(TEST_UTIL.getConfiguration(), "testTracker", null);
+ GroupTracker tracker = new GroupTracker(watcher, null);
+ try {
+ final Map<String, GroupInfo> groupMap = new ConcurrentHashMap<String, GroupInfo>();
+ final AtomicBoolean stateChanged = new AtomicBoolean(false);
+ GroupTracker.Listener listener = new GroupTracker.Listener() {
+
+ @Override
+ public void groupMapChanged(Map<String, GroupInfo> map) {
+ groupMap.clear();
+ groupMap.putAll(map);
+ stateChanged.set(true);
+ }
+ };
+ tracker.addListener(listener);
+ tracker.start();
+
+ //wait for tracker to retrieve initial info
+ tracker.blockUntilReady(0);
+ int tries = 60000/100;
+ while(groupMap.size() < 1 && tries > 0) {
+ Thread.sleep(100);
+ tries--;
+ }
+ assertNotSame(0, tries);
+ assertNotNull(groupAdmin.getGroupInfo("default"));
+
+ stateChanged.set(false);
+ groupAdmin.addGroup("foo");
+ while(!stateChanged.get()) {
+ Thread.sleep(100);
+ }
+ stateChanged.set(false);
+ assertEquals(2, groupMap.size());
+ assertNotNull(tracker.getGroup("foo"));
+ assertEquals(0, tracker.getGroup("foo").getServers().size());
+
+ addGroup(groupAdmin, "bar", 1);
+ while(!stateChanged.get()) {
+ Thread.sleep(100);
+ }
+ stateChanged.set(false);
+ assertEquals(3, groupMap.size());
+ assertNotNull(tracker.getGroup("bar"));
+ assertEquals(1, tracker.getGroup("bar").getServers().size());
+ } finally {
+ if(tracker != null) {
+ tracker.stop();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/66e16163/hbase-server/src/test/java/org/apache/hadoop/hbase/group/TestGroupsBase.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/group/TestGroupsBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/group/TestGroupsBase.java
new file mode 100644
index 0000000..6db4578
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/group/TestGroupsBase.java
@@ -0,0 +1,567 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.group;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.ClusterStatus;
+import org.apache.hadoop.hbase.HBaseCluster;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.HostPort;
+import org.apache.hadoop.hbase.NamespaceDescriptor;
+import org.apache.hadoop.hbase.RegionLoad;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.constraint.ConstraintException;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.security.SecureRandom;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public abstract class TestGroupsBase {
+ protected static final Log LOG = LogFactory.getLog(TestGroupsBase.class);
+
+ //shared
+ protected final static String groupPrefix = "Group";
+ protected final static String tablePrefix = "Group";
+ protected final static SecureRandom rand = new SecureRandom();
+
+ //shared, cluster type specific
+ protected static HBaseTestingUtility TEST_UTIL;
+ protected static HBaseAdmin admin;
+ protected static HBaseCluster cluster;
+ protected static GroupAdmin groupAdmin;
+
+ public final static long WAIT_TIMEOUT = 60000*5;
+ public final static int NUM_SLAVES_BASE = 4; //number of slaves for the smallest cluster
+
+
+
+ protected GroupInfo addGroup(GroupAdmin gAdmin, String groupName,
+ int serverCount) throws IOException, InterruptedException {
+ GroupInfo defaultInfo = gAdmin
+ .getGroupInfo(GroupInfo.DEFAULT_GROUP);
+ assertTrue(defaultInfo != null);
+ assertTrue(defaultInfo.getServers().size() >= serverCount);
+ gAdmin.addGroup(groupName);
+
+ Set<HostPort> set = new HashSet<HostPort>();
+ for(HostPort server: defaultInfo.getServers()) {
+ if(set.size() == serverCount) {
+ break;
+ }
+ set.add(server);
+ }
+ gAdmin.moveServers(set, groupName);
+ GroupInfo result = gAdmin.getGroupInfo(groupName);
+ assertTrue(result.getServers().size() >= serverCount);
+ return result;
+ }
+
+ static void removeGroup(GroupAdminClient groupAdmin, String groupName) throws IOException {
+ GroupInfo groupInfo = groupAdmin.getGroupInfo(groupName);
+ groupAdmin.moveTables(groupInfo.getTables(), GroupInfo.DEFAULT_GROUP);
+ groupAdmin.moveServers(groupInfo.getServers(), GroupInfo.DEFAULT_GROUP);
+ groupAdmin.removeGroup(groupName);
+ }
+
+ protected void deleteTableIfNecessary() throws IOException {
+ for (HTableDescriptor desc : TEST_UTIL.getHBaseAdmin().listTables(tablePrefix+".*")) {
+ TEST_UTIL.deleteTable(desc.getName());
+ }
+ }
+
+ protected void deleteNamespaceIfNecessary() throws IOException {
+ for (NamespaceDescriptor desc : TEST_UTIL.getHBaseAdmin().listNamespaceDescriptors()) {
+ if(desc.getName().startsWith(tablePrefix)) {
+ admin.deleteNamespace(desc.getName());
+ }
+ }
+ }
+
+ protected void deleteGroups() throws IOException {
+ GroupAdminClient groupAdmin = new GroupAdminClient(TEST_UTIL.getConfiguration());
+ for(GroupInfo group: groupAdmin.listGroups()) {
+ if(!group.getName().equals(GroupInfo.DEFAULT_GROUP)) {
+ groupAdmin.moveTables(group.getTables(), GroupInfo.DEFAULT_GROUP);
+ groupAdmin.moveServers(group.getServers(), GroupInfo.DEFAULT_GROUP);
+ groupAdmin.removeGroup(group.getName());
+ }
+ }
+ }
+
+ public Map<TableName, List<String>> getTableRegionMap() throws IOException {
+ Map<TableName, List<String>> map = Maps.newTreeMap();
+ Map<TableName, Map<ServerName, List<String>>> tableServerRegionMap
+ = getTableServerRegionMap();
+ for(TableName tableName : tableServerRegionMap.keySet()) {
+ if(!map.containsKey(tableName)) {
+ map.put(tableName, new LinkedList<String>());
+ }
+ for(List<String> subset: tableServerRegionMap.get(tableName).values()) {
+ map.get(tableName).addAll(subset);
+ }
+ }
+ return map;
+ }
+
+ public Map<TableName, Map<ServerName, List<String>>> getTableServerRegionMap()
+ throws IOException {
+ Map<TableName, Map<ServerName, List<String>>> map = Maps.newTreeMap();
+ ClusterStatus status = TEST_UTIL.getHBaseClusterInterface().getClusterStatus();
+ for(ServerName serverName : status.getServers()) {
+ for(RegionLoad rl : status.getLoad(serverName).getRegionsLoad().values()) {
+ TableName tableName = HRegionInfo.getTable(rl.getName());
+ if(!map.containsKey(tableName)) {
+ map.put(tableName, new TreeMap<ServerName, List<String>>());
+ }
+ if(!map.get(tableName).containsKey(serverName)) {
+ map.get(tableName).put(serverName, new LinkedList<String>());
+ }
+ map.get(tableName).get(serverName).add(rl.getNameAsString());
+ }
+ }
+ return map;
+ }
+
+ @Test(expected = ConstraintException.class)
+ public void testGroupInfoOfTableNonExistent() throws Exception {
+ groupAdmin.getGroupInfoOfTable(TableName.valueOf("nonexistent"));
+ }
+
+ @Test
+ public void testCreateMultiRegion() throws IOException {
+ LOG.info("testCreateMultiRegion");
+ byte[] tableName = Bytes.toBytes(tablePrefix + "_testCreateMultiRegion");
+ byte[] end = {1,3,5,7,9};
+ byte[] start = {0,2,4,6,8};
+ byte[][] f = {Bytes.toBytes("f")};
+ TEST_UTIL.createTable(tableName, f,1,start,end,10);
+ }
+
+ @Test
+ public void testCreateAndDrop() throws Exception {
+ LOG.info("testCreateAndDrop");
+
+ final TableName tableName = TableName.valueOf(tablePrefix + "_testCreateAndDrop");
+ TEST_UTIL.createTable(tableName, Bytes.toBytes("cf"));
+ //wait for created table to be assigned
+ TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate<Exception>() {
+ @Override
+ public boolean evaluate() throws Exception {
+ return getTableRegionMap().get(tableName) != null;
+ }
+ });
+ TEST_UTIL.deleteTable(tableName);
+ }
+
+
+ @Test
+ public void testSimpleRegionServerMove() throws IOException,
+ InterruptedException {
+ LOG.info("testSimpleRegionServerMove");
+
+ GroupInfo appInfo = addGroup(groupAdmin, groupPrefix + rand.nextInt(), 1);
+ GroupInfo adminInfo = addGroup(groupAdmin, groupPrefix + rand.nextInt(), 1);
+ GroupInfo dInfo = groupAdmin.getGroupInfo(GroupInfo.DEFAULT_GROUP);
+ assertEquals(3, groupAdmin.listGroups().size());
+ assertEquals(1, adminInfo.getServers().size());
+ assertEquals(1, appInfo.getServers().size());
+ assertEquals(admin.getClusterStatus().getServers().size() - 2, dInfo.getServers().size());
+ groupAdmin.moveServers(appInfo.getServers(),
+ GroupInfo.DEFAULT_GROUP);
+ groupAdmin.removeGroup(appInfo.getName());
+ groupAdmin.moveServers(adminInfo.getServers(),
+ GroupInfo.DEFAULT_GROUP);
+ groupAdmin.removeGroup(adminInfo.getName());
+ assertEquals(groupAdmin.listGroups().size(), 1);
+ }
+
+ @Test
+ public void testMoveServers() throws Exception {
+ LOG.info("testMoveServers");
+
+ //create groups and assign servers
+ addGroup(groupAdmin, "bar", 3);
+ groupAdmin.addGroup("foo");
+
+ GroupInfo barGroup = groupAdmin.getGroupInfo("bar");
+ GroupInfo fooGroup = groupAdmin.getGroupInfo("foo");
+ assertEquals(3, barGroup.getServers().size());
+ assertEquals(0, fooGroup.getServers().size());
+
+ //test fail bogus server move
+ try {
+ groupAdmin.moveServers(Sets.newHashSet(HostPort.valueOf("foo:9999")),"foo");
+ fail("Bogus servers shouldn't have been successfully moved.");
+ } catch(IOException ex) {
+ String exp = "Server foo:9999 is not an online server in default group.";
+ String msg = "Expected '"+exp+"' in exception message: ";
+ assertTrue(msg+" "+ex.getMessage(), ex.getMessage().contains(exp));
+ }
+
+ //test success case
+ LOG.info("moving servers "+barGroup.getServers()+" to group foo");
+ groupAdmin.moveServers(barGroup.getServers(), fooGroup.getName());
+
+ barGroup = groupAdmin.getGroupInfo("bar");
+ fooGroup = groupAdmin.getGroupInfo("foo");
+ assertEquals(0,barGroup.getServers().size());
+ assertEquals(3,fooGroup.getServers().size());
+
+ LOG.info("moving servers "+fooGroup.getServers()+" to group default");
+ groupAdmin.moveServers(fooGroup.getServers(), GroupInfo.DEFAULT_GROUP);
+
+ TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate<Exception>() {
+ @Override
+ public boolean evaluate() throws Exception {
+ return admin.getClusterStatus().getServers().size() ==
+ groupAdmin.getGroupInfo(GroupInfo.DEFAULT_GROUP).getServers().size();
+ }
+ });
+
+ fooGroup = groupAdmin.getGroupInfo("foo");
+ assertEquals(0, fooGroup.getServers().size());
+
+ //test group removal
+ LOG.info("Remove group "+barGroup.getName());
+ groupAdmin.removeGroup(barGroup.getName());
+ assertEquals(null, groupAdmin.getGroupInfo(barGroup.getName()));
+ LOG.info("Remove group "+fooGroup.getName());
+ groupAdmin.removeGroup(fooGroup.getName());
+ assertEquals(null, groupAdmin.getGroupInfo(fooGroup.getName()));
+ }
+
+ @Test
+ public void testTableMoveTruncateAndDrop() throws Exception {
+ LOG.info("testTableMove");
+
+ final TableName tableName = TableName.valueOf(tablePrefix + "_testTableMoveAndDrop");
+ final byte[] familyNameBytes = Bytes.toBytes("f");
+ String newGroupName = "g_" + rand.nextInt();
+ final GroupInfo newGroup = addGroup(groupAdmin, newGroupName, 2);
+
+ TEST_UTIL.createMultiRegionTable(tableName, familyNameBytes, 5);
+ TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate<Exception>() {
+ @Override
+ public boolean evaluate() throws Exception {
+ List<String> regions = getTableRegionMap().get(tableName);
+ if (regions == null)
+ return false;
+ return getTableRegionMap().get(tableName).size() >= 5;
+ }
+ });
+
+ GroupInfo tableGrp = groupAdmin.getGroupInfoOfTable(tableName);
+ assertTrue(tableGrp.getName().equals(GroupInfo.DEFAULT_GROUP));
+
+ //change table's group
+ LOG.info("Moving table "+tableName+" to "+newGroup.getName());
+ groupAdmin.moveTables(Sets.newHashSet(tableName), newGroup.getName());
+
+ //verify group change
+ assertEquals(newGroup.getName(),
+ groupAdmin.getGroupInfoOfTable(tableName).getName());
+
+ TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate<Exception>() {
+ @Override
+ public boolean evaluate() throws Exception {
+ Map<ServerName, List<String>> serverMap = getTableServerRegionMap().get(tableName);
+ int count = 0;
+ if (serverMap != null) {
+ for (ServerName rs : serverMap.keySet()) {
+ if (newGroup.containsServer(rs.getHostPort())) {
+ count += serverMap.get(rs).size();
+ }
+ }
+ }
+ return count == 5;
+ }
+ });
+
+ //test truncate
+ admin.disableTable(tableName);
+ admin.truncateTable(tableName, true);
+ assertEquals(1, groupAdmin.getGroupInfo(newGroup.getName()).getTables().size());
+ assertEquals(tableName, groupAdmin.getGroupInfo(newGroup.getName()).getTables().first());
+
+ //verify removed table is removed from group
+ TEST_UTIL.deleteTable(tableName);
+ assertEquals(0, groupAdmin.getGroupInfo(newGroup.getName()).getTables().size());
+ }
+
+ @Test
+ public void testGroupBalance() throws Exception {
+ LOG.info("testGroupBalance");
+ String newGroupName = "g_" + rand.nextInt();
+ final GroupInfo newGroup = addGroup(groupAdmin, newGroupName, 3);
+
+ final TableName tableName = TableName.valueOf(tablePrefix+"_ns", "testGroupBalance");
+ admin.createNamespace(
+ NamespaceDescriptor.create(tableName.getNamespaceAsString())
+ .addConfiguration(GroupInfo.NAMESPACEDESC_PROP_GROUP, newGroupName).build());
+ final byte[] familyNameBytes = Bytes.toBytes("f");
+ final HTableDescriptor desc = new HTableDescriptor(tableName);
+ desc.addFamily(new HColumnDescriptor("f"));
+ byte [] startKey = Bytes.toBytes("aaaaa");
+ byte [] endKey = Bytes.toBytes("zzzzz");
+ admin.createTable(desc, startKey, endKey, 6);
+ TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate<Exception>() {
+ @Override
+ public boolean evaluate() throws Exception {
+ List<String> regions = getTableRegionMap().get(tableName);
+ if (regions == null) {
+ return false;
+ }
+ return regions.size() >= 6;
+ }
+ });
+
+ //make assignment uneven, move all regions to one server
+ Map<ServerName,List<String>> assignMap =
+ getTableServerRegionMap().get(tableName);
+ final ServerName first = assignMap.entrySet().iterator().next().getKey();
+ for(HRegionInfo region: admin.getTableRegions(tableName)) {
+ if(!assignMap.get(first).contains(region)) {
+ admin.move(region.getEncodedNameAsBytes(), Bytes.toBytes(first.getServerName()));
+ }
+ }
+ TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate<Exception>() {
+ @Override
+ public boolean evaluate() throws Exception {
+ Map<ServerName, List<String>> map = getTableServerRegionMap().get(tableName);
+ if (map == null) {
+ return true;
+ }
+ List<String> regions = map.get(first);
+ if (regions == null) {
+ return true;
+ }
+ return regions.size() >= 6;
+ }
+ });
+
+ //balance the other group and make sure it doesn't affect the new group
+ groupAdmin.balanceGroup(GroupInfo.DEFAULT_GROUP);
+ assertEquals(6, getTableServerRegionMap().get(tableName).get(first).size());
+
+ groupAdmin.balanceGroup(newGroupName);
+ TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate<Exception>() {
+ @Override
+ public boolean evaluate() throws Exception {
+ for (List<String> regions : getTableServerRegionMap().get(tableName).values()) {
+ if (2 != regions.size()) {
+ return false;
+ }
+ }
+ return true;
+ }
+ });
+ }
+
+ @Test
+ public void testRegionMove() throws Exception {
+ LOG.info("testRegionMove");
+
+ final GroupInfo newGroup = addGroup(groupAdmin, "g_" + rand.nextInt(), 1);
+ final TableName tableName = TableName.valueOf(tablePrefix + rand.nextInt());
+ final byte[] familyNameBytes = Bytes.toBytes("f");
+ // All the regions created below will be assigned to the default group.
+ TEST_UTIL.createMultiRegionTable(tableName, familyNameBytes, 6);
+ TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate<Exception>() {
+ @Override
+ public boolean evaluate() throws Exception {
+ List<String> regions = getTableRegionMap().get(tableName);
+ if (regions == null)
+ return false;
+ return getTableRegionMap().get(tableName).size() >= 6;
+ }
+ });
+
+ //get target region to move
+ Map<ServerName,List<String>> assignMap =
+ getTableServerRegionMap().get(tableName);
+ String targetRegion = null;
+ for(ServerName server : assignMap.keySet()) {
+ targetRegion = assignMap.get(server).size() > 0 ? assignMap.get(server).get(0) : null;
+ if(targetRegion != null) {
+ break;
+ }
+ }
+ //get server which is not a member of new group
+ ServerName targetServer = null;
+ for(ServerName server : admin.getClusterStatus().getServers()) {
+ if(!newGroup.containsServer(server.getHostPort())) {
+ targetServer = server;
+ break;
+ }
+ }
+
+ final AdminProtos.AdminService.BlockingInterface targetRS =
+ admin.getConnection().getAdmin(targetServer);
+
+ //move target server to group
+ groupAdmin.moveServers(Sets.newHashSet(targetServer.getHostPort()),
+ newGroup.getName());
+ TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate<Exception>() {
+ @Override
+ public boolean evaluate() throws Exception {
+ return ProtobufUtil.getOnlineRegions(targetRS).size() <= 0;
+ }
+ });
+
+ // Lets move this region to the new group.
+ TEST_UTIL.getHBaseAdmin().move(Bytes.toBytes(HRegionInfo.encodeRegionName(Bytes.toBytes(targetRegion))),
+ Bytes.toBytes(targetServer.getServerName()));
+ TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate<Exception>() {
+ @Override
+ public boolean evaluate() throws Exception {
+ return
+ getTableRegionMap().get(tableName) != null &&
+ getTableRegionMap().get(tableName).size() == 6 &&
+ admin.getClusterStatus().getRegionsInTransition().size() < 1;
+ }
+ });
+
+ //verify that targetServer didn't open it
+ assertFalse(ProtobufUtil.getOnlineRegions(targetRS).contains(targetRegion));
+ }
+
+ @Test
+ public void testFailRemoveGroup() throws IOException, InterruptedException {
+ LOG.info("testFailRemoveGroup");
+
+ addGroup(groupAdmin, "bar", 3);
+ TableName tableName = TableName.valueOf(tablePrefix+"_my_table");
+ TEST_UTIL.createTable(tableName, Bytes.toBytes("f"));
+ groupAdmin.moveTables(Sets.newHashSet(tableName), "bar");
+ GroupInfo barGroup = groupAdmin.getGroupInfo("bar");
+ //group is not empty therefore it should fail
+ try {
+ groupAdmin.removeGroup(barGroup.getName());
+ fail("Expected remove group to fail");
+ } catch(IOException e) {
+ }
+ //group cannot lose all it's servers therefore it should fail
+ try {
+ groupAdmin.moveServers(barGroup.getServers(), GroupInfo.DEFAULT_GROUP);
+ fail("Expected move servers to fail");
+ } catch(IOException e) {
+ }
+
+ groupAdmin.moveTables(barGroup.getTables(), GroupInfo.DEFAULT_GROUP);
+ try {
+ groupAdmin.removeGroup(barGroup.getName());
+ fail("Expected move servers to fail");
+ } catch(IOException e) {
+ }
+
+ groupAdmin.moveServers(barGroup.getServers(), GroupInfo.DEFAULT_GROUP);
+ groupAdmin.removeGroup(barGroup.getName());
+
+ assertEquals(1, groupAdmin.listGroups().size());
+ }
+
+ @Test
+ public void testKillRS() throws Exception {
+ LOG.info("testKillRS");
+ GroupInfo appInfo = addGroup(groupAdmin, "appInfo", 1);
+
+
+ final TableName tableName = TableName.valueOf(tablePrefix+"_ns", "_testKillRS");
+ admin.createNamespace(
+ NamespaceDescriptor.create(tableName.getNamespaceAsString())
+ .addConfiguration(GroupInfo.NAMESPACEDESC_PROP_GROUP, appInfo.getName()).build());
+ final HTableDescriptor desc = new HTableDescriptor(tableName);
+ desc.addFamily(new HColumnDescriptor("f"));
+ admin.createTable(desc);
+ //wait for created table to be assigned
+ TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate<Exception>() {
+ @Override
+ public boolean evaluate() throws Exception {
+ return getTableRegionMap().get(desc.getTableName()) != null;
+ }
+ });
+
+ ServerName targetServer = ServerName.parseServerName(appInfo.getServers().first().toString());
+ AdminProtos.AdminService.BlockingInterface targetRS =
+ admin.getConnection().getAdmin(targetServer);
+ HRegionInfo targetRegion = ProtobufUtil.getOnlineRegions(targetRS).get(0);
+ assertEquals(1, ProtobufUtil.getOnlineRegions(targetRS).size());
+
+ try {
+ //stopping may cause an exception
+ //due to the connection loss
+ targetRS.stopServer(null,
+ AdminProtos.StopServerRequest.newBuilder().setReason("Die").build());
+ } catch(Exception e) {
+ }
+ assertFalse(cluster.getClusterStatus().getServers().contains(targetServer));
+
+ //wait for created table to be assigned
+ TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate<Exception>() {
+ @Override
+ public boolean evaluate() throws Exception {
+ return cluster.getClusterStatus().getRegionsInTransition().size() == 0;
+ }
+ });
+ TreeSet<HostPort> newServers = Sets.newTreeSet();
+ newServers.add(groupAdmin.getGroupInfo(GroupInfo.DEFAULT_GROUP).getServers().first());
+ groupAdmin.moveServers(newServers, appInfo.getName());
+ admin.assign(targetRegion.getRegionName());
+
+ //wait for region to be assigned
+ TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate<Exception>() {
+ @Override
+ public boolean evaluate() throws Exception {
+ return cluster.getClusterStatus().getRegionsInTransition().size() == 0;
+ }
+ });
+
+ targetServer = ServerName.parseServerName(newServers.first().toString());
+ targetRS =
+ admin.getConnection().getAdmin(targetServer);
+ assertEquals(1, ProtobufUtil.getOnlineRegions(targetRS).size());
+ assertEquals(tableName,
+ ProtobufUtil.getOnlineRegions(targetRS).get(0).getTable());
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/66e16163/hbase-server/src/test/java/org/apache/hadoop/hbase/group/TestGroupsOfflineMode.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/group/TestGroupsOfflineMode.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/group/TestGroupsOfflineMode.java
new file mode 100644
index 0000000..d5da85d
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/group/TestGroupsOfflineMode.java
@@ -0,0 +1,181 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.group;
+
+import com.google.common.collect.Sets;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseCluster;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.master.ServerManager;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+
+//This tests that GroupBasedBalancer will use data in zk
+//to do balancing during master startup
+//This does not test retain assignment
+@Category(MediumTests.class)
+public class TestGroupsOfflineMode {
+ private static final org.apache.commons.logging.Log LOG = LogFactory.getLog(TestGroupsOfflineMode.class);
+ private static HMaster master;
+ private static HBaseAdmin hbaseAdmin;
+ private static HBaseTestingUtility TEST_UTIL;
+ private static HBaseCluster cluster;
+ public final static long WAIT_TIMEOUT = 60000*5;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ TEST_UTIL = new HBaseTestingUtility();
+ TEST_UTIL.getConfiguration().set(
+ HConstants.HBASE_MASTER_LOADBALANCER_CLASS,
+ GroupBasedLoadBalancer.class.getName());
+ TEST_UTIL.getConfiguration().set(
+ ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART,
+ "1");
+ TEST_UTIL.startMiniCluster(2, 3);
+ cluster = TEST_UTIL.getHBaseCluster();
+ master = ((MiniHBaseCluster)cluster).getMaster();
+ master.balanceSwitch(false);
+ hbaseAdmin = new HBaseAdmin(TEST_UTIL.getConfiguration());
+ //wait till the balancer is in online mode
+ TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate<Exception>() {
+ @Override
+ public boolean evaluate() throws Exception {
+ return master.isInitialized() &&
+ ((GroupBasedLoadBalancer) master.getLoadBalancer()).isOnline() &&
+ master.getServerManager().getOnlineServersList().size() >= 3;
+ }
+ });
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @Test
+ public void testOffline() throws Exception, InterruptedException {
+ //table should be after group table name
+ //so it gets assigned later
+ final TableName failoverTable = TableName.valueOf("testOffline");
+ TEST_UTIL.createTable(failoverTable, Bytes.toBytes("f"));
+
+ GroupAdminClient groupAdmin = new GroupAdminClient(TEST_UTIL.getConfiguration());
+
+ final HRegionServer killRS = ((MiniHBaseCluster)cluster).getRegionServer(0);
+ final HRegionServer groupRS = ((MiniHBaseCluster)cluster).getRegionServer(1);
+ final HRegionServer failoverRS = ((MiniHBaseCluster)cluster).getRegionServer(2);
+
+ String newGroup = "my_group";
+ groupAdmin.addGroup(newGroup);
+ if(master.getAssignmentManager().getRegionStates().getRegionAssignments()
+ .containsValue(failoverRS.getServerName())) {
+ for(HRegionInfo regionInfo: hbaseAdmin.getOnlineRegions(failoverRS.getServerName())) {
+ hbaseAdmin.move(regionInfo.getEncodedNameAsBytes(),
+ Bytes.toBytes(failoverRS.getServerName().getServerName()));
+ }
+ LOG.info("Waiting for region unassignments on failover RS...");
+ TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate<Exception>() {
+ @Override
+ public boolean evaluate() throws Exception {
+ return master.getServerManager().getLoad(failoverRS.getServerName())
+ .getRegionsLoad().size() > 0;
+ }
+ });
+ }
+
+ //move server to group and make sure all tables are assigned
+ groupAdmin.moveServers(Sets.newHashSet(groupRS.getServerName().getHostPort()), newGroup);
+ TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate<Exception>() {
+ @Override
+ public boolean evaluate() throws Exception {
+ return groupRS.getNumberOfOnlineRegions() < 1 &&
+ master.getAssignmentManager().getRegionStates().getRegionsInTransition().size() < 1;
+ }
+ });
+ //move table to group and wait
+ groupAdmin.moveTables(Sets.newHashSet(GroupInfoManager.GROUP_TABLE_NAME), newGroup);
+ LOG.info("Waiting for move table...");
+ TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate<Exception>() {
+ @Override
+ public boolean evaluate() throws Exception {
+ return groupRS.getNumberOfOnlineRegions() == 1;
+ }
+ });
+
+ groupRS.stop("die");
+ //race condition here
+ TEST_UTIL.getHBaseCluster().getMaster().stopMaster();
+ LOG.info("Waiting for offline mode...");
+ TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate<Exception>() {
+ @Override
+ public boolean evaluate() throws Exception {
+ return TEST_UTIL.getHBaseCluster().getMaster() != null &&
+ TEST_UTIL.getHBaseCluster().getMaster().isActiveMaster() &&
+ TEST_UTIL.getHBaseCluster().getMaster().isInitialized() &&
+ TEST_UTIL.getHBaseCluster().getMaster().getServerManager().getOnlineServers().size()
+ <= 3;
+ }
+ });
+
+
+ GroupInfoManager groupMgr =
+ ((GroupBasedLoadBalancer)TEST_UTIL.getHBaseCluster().getMaster().getLoadBalancer())
+ .getGroupInfoManager();
+ //make sure balancer is in offline mode, since this is what we're testing
+ assertFalse(groupMgr.isOnline());
+ //verify the group affiliation that's loaded from ZK instead of tables
+ assertEquals(newGroup,
+ groupMgr.getGroupOfTable(GroupInfoManager.GROUP_TABLE_NAME));
+ assertEquals(GroupInfo.DEFAULT_GROUP, groupMgr.getGroupOfTable(failoverTable));
+
+ //kill final regionserver to see the failover happens for all tables
+ //except GROUP table since it's group does not have any online RS
+ killRS.stop("die");
+ master = TEST_UTIL.getHBaseCluster().getMaster();
+ LOG.info("Waiting for new table assignment...");
+ TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate<Exception>() {
+ @Override
+ public boolean evaluate() throws Exception {
+ return failoverRS.getOnlineRegions(failoverTable).size() >= 1;
+ }
+ });
+ assertEquals(0, failoverRS.getOnlineRegions(GroupInfoManager.GROUP_TABLE_NAME).size());
+
+ //need this for minicluster to shutdown cleanly
+ master.stopMaster();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/66e16163/hbase-server/src/test/java/org/apache/hadoop/hbase/group/VerifyingGroupAdminClient.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/group/VerifyingGroupAdminClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/group/VerifyingGroupAdminClient.java
new file mode 100644
index 0000000..2b0c2df
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/group/VerifyingGroupAdminClient.java
@@ -0,0 +1,155 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.group;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HostPort;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.RSGroupProtos;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+import org.junit.Assert;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Set;
+
+public class VerifyingGroupAdminClient implements GroupAdmin {
+ private HTableInterface table;
+ private ZooKeeperWatcher zkw;
+ private GroupSerDe serDe;
+ private GroupAdmin wrapped;
+
+ public VerifyingGroupAdminClient(GroupAdmin groupAdmin, Configuration conf)
+ throws IOException {
+ wrapped = groupAdmin;
+ table = HConnectionManager.createConnection(conf).getTable(GroupInfoManager.GROUP_TABLE_NAME);
+ zkw = new ZooKeeperWatcher(conf, this.getClass().getSimpleName(), null);
+ serDe = new GroupSerDe();
+ }
+
+ @Override
+ public void addGroup(String groupName) throws IOException {
+ wrapped.addGroup(groupName);
+ verify();
+ }
+
+ @Override
+ public GroupInfo getGroupInfo(String groupName) throws IOException {
+ return wrapped.getGroupInfo(groupName);
+ }
+
+ @Override
+ public GroupInfo getGroupInfoOfTable(TableName tableName) throws IOException {
+ return wrapped.getGroupInfoOfTable(tableName);
+ }
+
+ @Override
+ public void moveServers(Set<HostPort> servers, String targetGroup) throws IOException {
+ wrapped.moveServers(servers, targetGroup);
+ verify();
+ }
+
+ @Override
+ public void moveTables(Set<TableName> tables, String targetGroup) throws IOException {
+ wrapped.moveTables(tables, targetGroup);
+ verify();
+ }
+
+ @Override
+ public void removeGroup(String name) throws IOException {
+ wrapped.removeGroup(name);
+ verify();
+ }
+
+ @Override
+ public boolean balanceGroup(String name) throws IOException {
+ return wrapped.balanceGroup(name);
+ }
+
+ @Override
+ public List<GroupInfo> listGroups() throws IOException {
+ return wrapped.listGroups();
+ }
+
+ @Override
+ public GroupInfo getGroupOfServer(HostPort hostPort) throws IOException {
+ return wrapped.getGroupOfServer(hostPort);
+ }
+
+ public void verify() throws IOException {
+ Get get = new Get(GroupInfoManager.ROW_KEY);
+ get.addFamily(GroupInfoManager.META_FAMILY_BYTES);
+ Map<String, GroupInfo> groupMap = Maps.newHashMap();
+ Set<GroupInfo> zList = Sets.newHashSet();
+
+ Result result = table.get(get);
+ if(!result.isEmpty()) {
+ NavigableMap<byte[],NavigableMap<byte[],byte[]>> dataMap =
+ result.getNoVersionMap();
+ for(byte[] groupNameBytes:
+ dataMap.get(GroupInfoManager.META_FAMILY_BYTES).keySet()) {
+ RSGroupProtos.GroupInfo proto =
+ RSGroupProtos.GroupInfo.parseFrom(
+ dataMap.get(GroupInfoManager.META_FAMILY_BYTES).get(groupNameBytes));
+ GroupInfo groupInfo = ProtobufUtil.toGroupInfo(proto);
+ groupMap.put(groupInfo.getName(), groupInfo);
+ }
+ }
+ Assert.assertEquals(Sets.newHashSet(groupMap.values()),
+ Sets.newHashSet(wrapped.listGroups()));
+ try {
+ String groupBasePath = ZKUtil.joinZNode(zkw.baseZNode, "groupInfo");
+ for(String znode: ZKUtil.listChildrenNoWatch(zkw, groupBasePath)) {
+ byte[] data = ZKUtil.getData(zkw, ZKUtil.joinZNode(groupBasePath, znode));
+ if(data.length > 0) {
+ ProtobufUtil.expectPBMagicPrefix(data);
+ ByteArrayInputStream bis = new ByteArrayInputStream(
+ data, ProtobufUtil.lengthOfPBMagic(), data.length);
+ zList.add(ProtobufUtil.toGroupInfo(RSGroupProtos.GroupInfo.parseFrom(bis)));
+ }
+ }
+ Assert.assertEquals(zList.size(), groupMap.size());
+ for(GroupInfo groupInfo: zList) {
+ Assert.assertTrue(groupMap.get(groupInfo.getName()).equals(groupInfo));
+ }
+ } catch (KeeperException e) {
+ throw new IOException("ZK verification failed", e);
+ } catch (DeserializationException e) {
+ throw new IOException("ZK verification failed", e);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+}