You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ga...@apache.org on 2012/09/18 08:32:58 UTC
svn commit: r1387001 [5/5] - in /hbase/trunk/hbase-server/src:
main/java/org/apache/hadoop/hbase/client/
main/java/org/apache/hadoop/hbase/client/coprocessor/
main/java/org/apache/hadoop/hbase/coprocessor/
main/java/org/apache/hadoop/hbase/coprocessor/...
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1387001&r1=1387000&r2=1387001&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Tue Sep 18 06:32:57 2012
@@ -61,6 +61,7 @@ import java.util.concurrent.atomic.Atomi
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import com.google.protobuf.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -115,6 +116,7 @@ import org.apache.hadoop.hbase.ipc.HBase
import org.apache.hadoop.hbase.ipc.RpcCallContext;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.metrics.OperationMetrics;
import org.apache.hadoop.hbase.regionserver.metrics.RegionMetricsStorage;
@@ -143,6 +145,8 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.MutableClassToInstanceMap;
+import static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall;
+
/**
* HRegion stores data for a certain region of a table. It stores all columns
* for each row. A given table consists of one or more HRegions.
@@ -215,6 +219,9 @@ public class HRegion implements HeapSize
private Map<String, Class<? extends CoprocessorProtocol>>
protocolHandlerNames = Maps.newHashMap();
+ // TODO: account for each registered handler in HeapSize computation
+ private Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap();
+
/**
* Temporary subdirectory of the region directory used for compaction output.
*/
@@ -5027,7 +5034,7 @@ public class HRegion implements HeapSize
public static final long FIXED_OVERHEAD = ClassSize.align(
ClassSize.OBJECT +
ClassSize.ARRAY +
- 36 * ClassSize.REFERENCE + Bytes.SIZEOF_INT +
+ 37 * ClassSize.REFERENCE + Bytes.SIZEOF_INT +
(7 * Bytes.SIZEOF_LONG) +
Bytes.SIZEOF_BOOLEAN);
@@ -5085,6 +5092,7 @@ public class HRegion implements HeapSize
* @return {@code true} if the registration was successful, {@code false}
* otherwise
*/
+ @Deprecated
public <T extends CoprocessorProtocol> boolean registerProtocol(
Class<T> protocol, T handler) {
@@ -5109,6 +5117,41 @@ public class HRegion implements HeapSize
}
/**
+ * Registers a new protocol buffer {@link Service} subclass as a coprocessor endpoint to
+ * be available for handling
+ * {@link HRegion#execService(com.google.protobuf.RpcController, org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall)}} calls.
+ *
+ * <p>
+ * Only a single instance may be registered per region for a given {@link Service} subclass (the
+ * instances are keyed on {@link com.google.protobuf.Descriptors.ServiceDescriptor#getFullName()}.
+ * After the first registration, subsequent calls with the same service name will fail with
+ * a return value of {@code false}.
+ * </p>
+ * @param instance the {@code Service} subclass instance to expose as a coprocessor endpoint
+ * @return {@code true} if the registration was successful, {@code false}
+ * otherwise
+ */
+ public boolean registerService(Service instance) {
+ /*
+ * No stacking of instances is allowed for a single service name
+ */
+ Descriptors.ServiceDescriptor serviceDesc = instance.getDescriptorForType();
+ if (coprocessorServiceHandlers.containsKey(serviceDesc.getFullName())) {
+ LOG.error("Coprocessor service "+serviceDesc.getFullName()+
+ " already registered, rejecting request from "+instance
+ );
+ return false;
+ }
+
+ coprocessorServiceHandlers.put(serviceDesc.getFullName(), instance);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Registered coprocessor service: region="+
+ Bytes.toStringBinary(getRegionName())+" service="+serviceDesc.getFullName());
+ }
+ return true;
+ }
+
+ /**
* Executes a single {@link org.apache.hadoop.hbase.ipc.CoprocessorProtocol}
* method using the registered protocol handlers.
* {@link CoprocessorProtocol} implementations must be registered via the
@@ -5123,6 +5166,7 @@ public class HRegion implements HeapSize
* occurs during the invocation
* @see org.apache.hadoop.hbase.regionserver.HRegion#registerProtocol(Class, org.apache.hadoop.hbase.ipc.CoprocessorProtocol)
*/
+ @Deprecated
public ExecResult exec(Exec call)
throws IOException {
Class<? extends CoprocessorProtocol> protocol = call.getProtocol();
@@ -5174,6 +5218,55 @@ public class HRegion implements HeapSize
return new ExecResult(getRegionName(), value);
}
+ /**
+ * Executes a single protocol buffer coprocessor endpoint {@link Service} method using
+ * the registered protocol handlers. {@link Service} implementations must be registered via the
+ * {@link org.apache.hadoop.hbase.regionserver.HRegion#registerService(com.google.protobuf.Service)}
+ * method before they are available.
+ *
+ * @param controller an {@code RpcContoller} implementation to pass to the invoked service
+ * @param call a {@code CoprocessorServiceCall} instance identifying the service, method,
+ * and parameters for the method invocation
+ * @return a protocol buffer {@code Message} instance containing the method's result
+ * @throws IOException if no registered service handler is found or an error
+ * occurs during the invocation
+ * @see org.apache.hadoop.hbase.regionserver.HRegion#registerService(com.google.protobuf.Service)
+ */
+ public Message execService(RpcController controller, CoprocessorServiceCall call)
+ throws IOException {
+ String serviceName = call.getServiceName();
+ String methodName = call.getMethodName();
+ if (!coprocessorServiceHandlers.containsKey(serviceName)) {
+ throw new HBaseRPC.UnknownProtocolException(null,
+ "No registered coprocessor service found for name "+serviceName+
+ " in region "+Bytes.toStringBinary(getRegionName()));
+ }
+
+ Service service = coprocessorServiceHandlers.get(serviceName);
+ Descriptors.ServiceDescriptor serviceDesc = service.getDescriptorForType();
+ Descriptors.MethodDescriptor methodDesc = serviceDesc.findMethodByName(methodName);
+ if (methodDesc == null) {
+ throw new HBaseRPC.UnknownProtocolException(service.getClass(),
+ "Unknown method "+methodName+" called on service "+serviceName+
+ " in region "+Bytes.toStringBinary(getRegionName()));
+ }
+
+ Message request = service.getRequestPrototype(methodDesc).newBuilderForType()
+ .mergeFrom(call.getRequest()).build();
+ final Message.Builder responseBuilder =
+ service.getResponsePrototype(methodDesc).newBuilderForType();
+ service.callMethod(methodDesc, controller, request, new RpcCallback<Message>() {
+ @Override
+ public void run(Message message) {
+ if (message != null) {
+ responseBuilder.mergeFrom(message);
+ }
+ }
+ });
+
+ return responseBuilder.build();
+ }
+
/*
* Process table.
* Do major compaction or list content.
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1387001&r1=1387000&r2=1387001&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Tue Sep 18 06:32:57 2012
@@ -54,6 +54,7 @@ import java.util.concurrent.locks.Reentr
import javax.management.ObjectName;
+import com.google.protobuf.Message;
import org.apache.commons.lang.mutable.MutableDouble;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -115,6 +116,7 @@ import org.apache.hadoop.hbase.ipc.HBase
import org.apache.hadoop.hbase.ipc.ProtocolSignature;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
+import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
@@ -230,6 +232,9 @@ import com.google.protobuf.ByteString;
import com.google.protobuf.Message;
import com.google.protobuf.RpcController;
+import static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
+import static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
+
/**
* HRegionServer makes a set of HRegions available to clients. It checks in with
* the HMaster. There are many HRegionServers in a single HBase deployment.
@@ -3333,6 +3338,31 @@ public class HRegionServer implements C
}
}
+ @Override
+ public CoprocessorServiceResponse execService(final RpcController controller,
+ final CoprocessorServiceRequest request) throws ServiceException {
+ try {
+ requestCount.incrementAndGet();
+ HRegion region = getRegion(request.getRegion());
+ // ignore the passed in controller (from the serialized call)
+ ServerRpcController execController = new ServerRpcController();
+ Message result = region.execService(execController, request.getCall());
+ if (execController.getFailedOn() != null) {
+ throw execController.getFailedOn();
+ }
+ CoprocessorServiceResponse.Builder builder =
+ CoprocessorServiceResponse.newBuilder();
+ builder.setRegion(RequestConverter.buildRegionSpecifier(
+ RegionSpecifierType.REGION_NAME, region.getRegionName()));
+ builder.setValue(
+ builder.getValueBuilder().setName(result.getClass().getName())
+ .setValue(result.toByteString()));
+ return builder.build();
+ } catch (IOException ie) {
+ throw new ServiceException(ie);
+ }
+ }
+
/**
* Execute multiple actions on a table: get, mutate, and/or execCoprocessor
*
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java?rev=1387001&r1=1387000&r2=1387001&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java Tue Sep 18 06:32:57 2012
@@ -49,6 +49,7 @@ import org.apache.hadoop.hbase.client.Pu
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
@@ -216,7 +217,11 @@ public class RegionCoprocessorHost
for (Class c : implClass.getInterfaces()) {
if (CoprocessorProtocol.class.isAssignableFrom(c)) {
region.registerProtocol(c, (CoprocessorProtocol)instance);
- break;
+ }
+ // we allow endpoints to register as both CoproocessorProtocols and Services
+ // for ease of transition
+ if (CoprocessorService.class.isAssignableFrom(c)) {
+ region.registerService( ((CoprocessorService)instance).getService() );
}
}
ConcurrentMap<String, Object> classData;
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java?rev=1387001&r1=1387000&r2=1387001&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java Tue Sep 18 06:32:57 2012
@@ -28,12 +28,15 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
+import com.google.protobuf.Service;
+import com.google.protobuf.ServiceException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -733,6 +736,25 @@ public class RemoteHTable implements HTa
}
@Override
+ public CoprocessorRpcChannel coprocessorService(byte[] row) {
+ throw new UnsupportedOperationException("coprocessorService not implemented");
+ }
+
+ @Override
+ public <T extends Service, R> Map<byte[], R> coprocessorService(Class<T> service,
+ byte[] startKey, byte[] endKey, Batch.Call<T, R> callable)
+ throws ServiceException, Throwable {
+ throw new UnsupportedOperationException("coprocessorService not implemented");
+ }
+
+ @Override
+ public <T extends Service, R> void coprocessorService(Class<T> service,
+ byte[] startKey, byte[] endKey, Batch.Call<T, R> callable, Batch.Callback<R> callback)
+ throws ServiceException, Throwable {
+ throw new UnsupportedOperationException("coprocessorService not implemented");
+ }
+
+ @Override
public void mutateRow(RowMutations rm) throws IOException {
throw new IOException("atomicMutation not supported");
}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java?rev=1387001&r1=1387000&r2=1387001&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java Tue Sep 18 06:32:57 2012
@@ -24,6 +24,9 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.Service;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -40,18 +43,16 @@ import org.apache.hadoop.hbase.client.In
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
-import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
-import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
-import org.apache.hadoop.hbase.coprocessor.MasterObserver;
-import org.apache.hadoop.hbase.coprocessor.ObserverContext;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.*;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.ipc.HBaseRPC;
import org.apache.hadoop.hbase.ipc.ProtocolSignature;
import org.apache.hadoop.hbase.ipc.RequestContext;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.ResponseConverter;
+import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
@@ -69,6 +70,8 @@ import com.google.common.collect.MapMake
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+import static org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService;
+
/**
* Provides basic authorization checks for data access and administrative
* operations.
@@ -101,7 +104,8 @@ import com.google.common.collect.Sets;
* </p>
*/
public class AccessController extends BaseRegionObserver
- implements MasterObserver, AccessControllerProtocol {
+ implements MasterObserver, AccessControllerProtocol,
+ AccessControlService.Interface, CoprocessorService {
/**
* Represents the result of an authorization check for logging and error
* reporting.
@@ -1049,6 +1053,7 @@ public class AccessController extends Ba
* These methods are only allowed to be called against the _acl_ region(s).
* This will be restricted by both client side and endpoint implementations.
*/
+ @Deprecated
@Override
public void grant(UserPermission perm) throws IOException {
// verify it's only running at .acl.
@@ -1079,6 +1084,7 @@ public class AccessController extends Ba
permission.getActions()));
}
+ @Deprecated
@Override
public void revoke(UserPermission perm) throws IOException {
// only allowed to be called on _acl_ region
@@ -1109,6 +1115,7 @@ public class AccessController extends Ba
permission.getActions()));
}
+ @Deprecated
@Override
public List<UserPermission> getUserPermissions(final byte[] tableName) throws IOException {
// only allowed to be called on _acl_ region
@@ -1124,6 +1131,7 @@ public class AccessController extends Ba
}
}
+ @Deprecated
@Override
public void checkPermissions(Permission[] permissions) throws IOException {
byte[] tableName = regionEnv.getRegion().getTableDesc().getName();
@@ -1158,11 +1166,13 @@ public class AccessController extends Ba
}
}
+ @Deprecated
@Override
public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
return PROTOCOL_VERSION;
}
+ @Deprecated
@Override
public ProtocolSignature getProtocolSignature(String protocol,
long clientVersion, int clientMethodsHash) throws IOException {
@@ -1173,6 +1183,79 @@ public class AccessController extends Ba
"Unexpected protocol requested: "+protocol);
}
+
+ /* ---- Protobuf AccessControlService implementation ---- */
+ @Override
+ public void grant(RpcController controller,
+ AccessControlProtos.GrantRequest request,
+ RpcCallback<AccessControlProtos.GrantResponse> done) {
+ UserPermission perm = ProtobufUtil.toUserPermission(request.getPermission());
+ AccessControlProtos.GrantResponse response = null;
+ try {
+ grant(perm);
+ response = AccessControlProtos.GrantResponse.getDefaultInstance();
+ } catch (IOException ioe) {
+ // pass exception back up
+ ResponseConverter.setControllerException(controller, ioe);
+ }
+ done.run(response);
+ }
+
+ @Override
+ public void revoke(RpcController controller,
+ AccessControlProtos.RevokeRequest request,
+ RpcCallback<AccessControlProtos.RevokeResponse> done) {
+ UserPermission perm = ProtobufUtil.toUserPermission(request.getPermission());
+ AccessControlProtos.RevokeResponse response = null;
+ try {
+ revoke(perm);
+ response = AccessControlProtos.RevokeResponse.getDefaultInstance();
+ } catch (IOException ioe) {
+ // pass exception back up
+ ResponseConverter.setControllerException(controller, ioe);
+ }
+ done.run(response);
+ }
+
+ @Override
+ public void getUserPermissions(RpcController controller,
+ AccessControlProtos.UserPermissionsRequest request,
+ RpcCallback<AccessControlProtos.UserPermissionsResponse> done) {
+ byte[] table = request.getTable().toByteArray();
+ AccessControlProtos.UserPermissionsResponse response = null;
+ try {
+ List<UserPermission> perms = getUserPermissions(table);
+ response = ResponseConverter.buildUserPermissionsResponse(perms);
+ } catch (IOException ioe) {
+ // pass exception back up
+ ResponseConverter.setControllerException(controller, ioe);
+ }
+ done.run(response);
+ }
+
+ @Override
+ public void checkPermissions(RpcController controller,
+ AccessControlProtos.CheckPermissionsRequest request,
+ RpcCallback<AccessControlProtos.CheckPermissionsResponse> done) {
+ Permission[] perms = new Permission[request.getPermissionCount()];
+ for (int i=0; i < request.getPermissionCount(); i++) {
+ perms[i] = ProtobufUtil.toPermission(request.getPermission(i));
+ }
+ AccessControlProtos.CheckPermissionsResponse response = null;
+ try {
+ checkPermissions(perms);
+ response = AccessControlProtos.CheckPermissionsResponse.getDefaultInstance();
+ } catch (IOException ioe) {
+ ResponseConverter.setControllerException(controller, ioe);
+ }
+ done.run(response);
+ }
+
+ @Override
+ public Service getService() {
+ return AccessControlProtos.AccessControlService.newReflectiveService(this);
+ }
+
private byte[] getTableName(RegionCoprocessorEnvironment e) {
HRegion region = e.getRegion();
byte[] tableName = null;
Modified: hbase/trunk/hbase-server/src/main/protobuf/AccessControl.proto
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/protobuf/AccessControl.proto?rev=1387001&r1=1387000&r2=1387001&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/protobuf/AccessControl.proto (original)
+++ hbase/trunk/hbase-server/src/main/protobuf/AccessControl.proto Tue Sep 18 06:32:57 2012
@@ -52,3 +52,48 @@ message UserTablePermissions {
repeated UserPermissions permissions = 1;
}
+
+message GrantRequest {
+ required UserPermission permission = 1;
+}
+
+message GrantResponse {
+}
+
+message RevokeRequest {
+ required UserPermission permission = 1;
+
+}
+
+message RevokeResponse {
+}
+
+
+message UserPermissionsRequest {
+ required bytes table = 1;
+}
+
+message UserPermissionsResponse {
+ repeated UserPermission permission = 1;
+}
+
+message CheckPermissionsRequest {
+ repeated Permission permission = 1;
+}
+
+message CheckPermissionsResponse {
+}
+
+service AccessControlService {
+ rpc grant(GrantRequest)
+ returns (GrantResponse);
+
+ rpc revoke(RevokeRequest)
+ returns (RevokeResponse);
+
+ rpc getUserPermissions(UserPermissionsRequest)
+ returns (UserPermissionsResponse);
+
+ rpc checkPermissions(CheckPermissionsRequest)
+ returns (CheckPermissionsResponse);
+}
Modified: hbase/trunk/hbase-server/src/main/protobuf/Client.proto
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/protobuf/Client.proto?rev=1387001&r1=1387000&r2=1387001&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/protobuf/Client.proto (original)
+++ hbase/trunk/hbase-server/src/main/protobuf/Client.proto Tue Sep 18 06:32:57 2012
@@ -296,6 +296,23 @@ message ExecCoprocessorResponse {
required NameBytesPair value = 1;
}
+message CoprocessorServiceCall {
+ required bytes row = 1;
+ required string serviceName = 2;
+ required string methodName = 3;
+ required bytes request = 4;
+}
+
+message CoprocessorServiceRequest {
+ required RegionSpecifier region = 1;
+ required CoprocessorServiceCall call = 2;
+}
+
+message CoprocessorServiceResponse {
+ required RegionSpecifier region = 1;
+ required NameBytesPair value = 2;
+}
+
/**
* An action that is part of MultiRequest.
* This is a union type - exactly one of the fields will be set.
@@ -359,6 +376,9 @@ service ClientService {
rpc execCoprocessor(ExecCoprocessorRequest)
returns(ExecCoprocessorResponse);
+ rpc execService(CoprocessorServiceRequest)
+ returns(CoprocessorServiceResponse);
+
rpc multi(MultiRequest)
returns(MultiResponse);
}
Added: hbase/trunk/hbase-server/src/main/protobuf/Examples.proto
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/protobuf/Examples.proto?rev=1387001&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/protobuf/Examples.proto (added)
+++ hbase/trunk/hbase-server/src/main/protobuf/Examples.proto Tue Sep 18 06:32:57 2012
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.hadoop.hbase.coprocessor.example.generated";
+option java_outer_classname = "ExampleProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+option optimize_for = SPEED;
+
+message CountRequest {
+}
+
+message CountResponse {
+ required int64 count = 1 [default = 0];
+}
+
+service RowCountService {
+ rpc getRowCount(CountRequest)
+ returns (CountResponse);
+ rpc getKeyValueCount(CountRequest)
+ returns (CountResponse);
+}
\ No newline at end of file
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpoint.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpoint.java?rev=1387001&r1=1387000&r2=1387001&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpoint.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpoint.java Tue Sep 18 06:32:57 2012
@@ -46,13 +46,16 @@ implements ColumnAggregationProtocol {
.getRegion().getScanner(scan);
try {
List<KeyValue> curVals = new ArrayList<KeyValue>();
- boolean done = false;
+ boolean hasMore = false;
do {
curVals.clear();
- done = scanner.next(curVals);
- KeyValue kv = curVals.get(0);
- sumResult += Bytes.toInt(kv.getBuffer(), kv.getValueOffset());
- } while (done);
+ hasMore = scanner.next(curVals);
+ for (KeyValue kv : curVals) {
+ if (Bytes.equals(qualifier, kv.getQualifier())) {
+ sumResult += Bytes.toInt(kv.getBuffer(), kv.getValueOffset());
+ }
+ }
+ } while (hasMore);
} finally {
scanner.close();
}
Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/ProtobufCoprocessorService.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/ProtobufCoprocessorService.java?rev=1387001&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/ProtobufCoprocessorService.java (added)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/ProtobufCoprocessorService.java Tue Sep 18 06:32:57 2012
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.coprocessor;
+
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.Service;
+import org.apache.hadoop.hbase.Coprocessor;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos;
+import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
+import org.apache.hadoop.hbase.protobuf.ResponseConverter;
+
+import java.io.IOException;
+
+/**
+ * Test implementation of a coprocessor endpoint exposing the
+ * {@link TestRpcServiceProtos.TestProtobufRpcProto} service methods. For internal use by
+ * unit tests only.
+ */
+public class ProtobufCoprocessorService
+ extends TestRpcServiceProtos.TestProtobufRpcProto
+ implements CoprocessorService, Coprocessor {
+ public ProtobufCoprocessorService() {
+ }
+
+ @Override
+ public Service getService() {
+ return this;
+ }
+
+ @Override
+ public void ping(RpcController controller, TestProtos.EmptyRequestProto request,
+ RpcCallback<TestProtos.EmptyResponseProto> done) {
+ done.run(TestProtos.EmptyResponseProto.getDefaultInstance());
+ }
+
+ @Override
+ public void echo(RpcController controller, TestProtos.EchoRequestProto request,
+ RpcCallback<TestProtos.EchoResponseProto> done) {
+ String message = request.getMessage();
+ done.run(TestProtos.EchoResponseProto.newBuilder().setMessage(message).build());
+ }
+
+ @Override
+ public void error(RpcController controller, TestProtos.EmptyRequestProto request,
+ RpcCallback<TestProtos.EmptyResponseProto> done) {
+ ResponseConverter.setControllerException(controller, new IOException("Test exception"));
+ done.run(null);
+ }
+
+ @Override
+ public void start(CoprocessorEnvironment env) throws IOException {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ @Override
+ public void stop(CoprocessorEnvironment env) throws IOException {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+}
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorEndpoint.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorEndpoint.java?rev=1387001&r1=1387000&r2=1387001&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorEndpoint.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorEndpoint.java Tue Sep 18 06:32:57 2012
@@ -18,43 +18,40 @@
*/
package org.apache.hadoop.hbase.coprocessor;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-
import java.io.IOException;
+import java.util.Collections;
import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import com.google.protobuf.RpcController;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.MediumTests;
-import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
-import org.apache.hadoop.hbase.client.coprocessor.Exec;
-import org.apache.hadoop.hbase.io.HbaseObjectWritable;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
+import org.apache.hadoop.hbase.ipc.ServerRpcController;
+import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos;
+import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.junit.AfterClass;
import org.junit.BeforeClass;
-import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-import com.google.protobuf.ByteString;
+import static org.junit.Assert.*;
/**
* TestEndpoint: test cases to verify coprocessor Endpoint
*/
@Category(MediumTests.class)
public class TestCoprocessorEndpoint {
+ private static final Log LOG = LogFactory.getLog(TestCoprocessorEndpoint.class);
private static final byte[] TEST_TABLE = Bytes.toBytes("TestTable");
private static final byte[] TEST_FAMILY = Bytes.toBytes("TestFamily");
@@ -76,27 +73,23 @@ public class TestCoprocessorEndpoint {
// set configure to indicate which cp should be loaded
Configuration conf = util.getConfiguration();
conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
- "org.apache.hadoop.hbase.coprocessor.ColumnAggregationEndpoint",
- "org.apache.hadoop.hbase.coprocessor.GenericEndpoint");
+ org.apache.hadoop.hbase.coprocessor.ColumnAggregationEndpoint.class.getName(),
+ org.apache.hadoop.hbase.coprocessor.GenericEndpoint.class.getName(),
+ ProtobufCoprocessorService.class.getName());
util.startMiniCluster(2);
- HTable table = util.createTable(TEST_TABLE, TEST_FAMILY);
- util.createMultiRegions(util.getConfiguration(), table, TEST_FAMILY,
- new byte[][] { HConstants.EMPTY_BYTE_ARRAY,
- ROWS[rowSeperator1], ROWS[rowSeperator2] });
+ HBaseAdmin admin = new HBaseAdmin(conf);
+ HTableDescriptor desc = new HTableDescriptor(TEST_TABLE);
+ desc.addFamily(new HColumnDescriptor(TEST_FAMILY));
+ admin.createTable(desc, new byte[][]{ROWS[rowSeperator1], ROWS[rowSeperator2]});
+ util.waitUntilAllRegionsAssigned(3);
+ admin.close();
+ HTable table = new HTable(conf, TEST_TABLE);
for (int i = 0; i < ROWSIZE; i++) {
Put put = new Put(ROWS[i]);
- put.setWriteToWAL(false);
put.add(TEST_FAMILY, TEST_QUALIFIER, Bytes.toBytes(i));
table.put(put);
}
-
- // sleep here is an ugly hack to allow region transitions to finish
- long timeout = System.currentTimeMillis() + (15 * 1000);
- while ((System.currentTimeMillis() < timeout) &&
- (table.getRegionsInfo().size() != 2)) {
- Thread.sleep(250);
- }
table.close();
}
@@ -135,7 +128,7 @@ public class TestCoprocessorEndpoint {
table.close();
}
- @Ignore @Test
+ @Test
public void testAggregation() throws Throwable {
HTable table = new HTable(util.getConfiguration(), TEST_TABLE);
Map<byte[], Long> results;
@@ -143,7 +136,7 @@ public class TestCoprocessorEndpoint {
// scan: for all regions
results = table
.coprocessorExec(ColumnAggregationProtocol.class,
- ROWS[rowSeperator1 - 1], ROWS[rowSeperator2 + 1],
+ ROWS[0], ROWS[ROWS.length-1],
new Batch.Call<ColumnAggregationProtocol, Long>() {
public Long call(ColumnAggregationProtocol instance)
throws IOException {
@@ -153,19 +146,20 @@ public class TestCoprocessorEndpoint {
int sumResult = 0;
int expectedResult = 0;
for (Map.Entry<byte[], Long> e : results.entrySet()) {
+ LOG.info("Got value "+e.getValue()+" for region "+Bytes.toStringBinary(e.getKey()));
sumResult += e.getValue();
}
for (int i = 0; i < ROWSIZE; i++) {
expectedResult += i;
}
- assertEquals("Invalid result", sumResult, expectedResult);
+ assertEquals("Invalid result", expectedResult, sumResult);
results.clear();
// scan: for region 2 and region 3
results = table
.coprocessorExec(ColumnAggregationProtocol.class,
- ROWS[rowSeperator1 + 1], ROWS[rowSeperator2 + 1],
+ ROWS[rowSeperator1], ROWS[ROWS.length-1],
new Batch.Call<ColumnAggregationProtocol, Long>() {
public Long call(ColumnAggregationProtocol instance)
throws IOException {
@@ -175,15 +169,90 @@ public class TestCoprocessorEndpoint {
sumResult = 0;
expectedResult = 0;
for (Map.Entry<byte[], Long> e : results.entrySet()) {
+ LOG.info("Got value "+e.getValue()+" for region "+Bytes.toStringBinary(e.getKey()));
sumResult += e.getValue();
}
for (int i = rowSeperator1; i < ROWSIZE; i++) {
expectedResult += i;
}
- assertEquals("Invalid result", sumResult, expectedResult);
+ assertEquals("Invalid result", expectedResult, sumResult);
table.close();
}
+ @Test
+ public void testCoprocessorService() throws Throwable {
+ HTable table = new HTable(util.getConfiguration(), TEST_TABLE);
+ NavigableMap<HRegionInfo,ServerName> regions = table.getRegionLocations();
+
+ final TestProtos.EchoRequestProto request =
+ TestProtos.EchoRequestProto.newBuilder().setMessage("hello").build();
+ final Map<byte[], String> results = Collections.synchronizedMap(
+ new TreeMap<byte[], String>(Bytes.BYTES_COMPARATOR));
+ try {
+ // scan: for all regions
+ final RpcController controller = new ServerRpcController();
+ table.coprocessorService(TestRpcServiceProtos.TestProtobufRpcProto.class,
+ ROWS[0], ROWS[ROWS.length - 1],
+ new Batch.Call<TestRpcServiceProtos.TestProtobufRpcProto, TestProtos.EchoResponseProto>() {
+ public TestProtos.EchoResponseProto call(TestRpcServiceProtos.TestProtobufRpcProto instance)
+ throws IOException {
+ LOG.debug("Default response is " + TestProtos.EchoRequestProto.getDefaultInstance());
+ BlockingRpcCallback<TestProtos.EchoResponseProto> callback = new BlockingRpcCallback<TestProtos.EchoResponseProto>();
+ instance.echo(controller, request, callback);
+ TestProtos.EchoResponseProto response = callback.get();
+ LOG.debug("Batch.Call returning result " + response);
+ return response;
+ }
+ },
+ new Batch.Callback<TestProtos.EchoResponseProto>() {
+ public void update(byte[] region, byte[] row, TestProtos.EchoResponseProto result) {
+ assertNotNull(result);
+ assertEquals("hello", result.getMessage());
+ results.put(region, result.getMessage());
+ }
+ }
+ );
+ for (Map.Entry<byte[], String> e : results.entrySet()) {
+ LOG.info("Got value "+e.getValue()+" for region "+Bytes.toStringBinary(e.getKey()));
+ }
+ assertEquals(3, results.size());
+ for (HRegionInfo info : regions.navigableKeySet()) {
+ LOG.info("Region info is "+info.getRegionNameAsString());
+ assertTrue(results.containsKey(info.getRegionName()));
+ }
+ results.clear();
+
+ // scan: for region 2 and region 3
+ table.coprocessorService(TestRpcServiceProtos.TestProtobufRpcProto.class,
+ ROWS[rowSeperator1], ROWS[ROWS.length - 1],
+ new Batch.Call<TestRpcServiceProtos.TestProtobufRpcProto, TestProtos.EchoResponseProto>() {
+ public TestProtos.EchoResponseProto call(TestRpcServiceProtos.TestProtobufRpcProto instance)
+ throws IOException {
+ LOG.debug("Default response is " + TestProtos.EchoRequestProto.getDefaultInstance());
+ BlockingRpcCallback<TestProtos.EchoResponseProto> callback = new BlockingRpcCallback<TestProtos.EchoResponseProto>();
+ instance.echo(controller, request, callback);
+ TestProtos.EchoResponseProto response = callback.get();
+ LOG.debug("Batch.Call returning result " + response);
+ return response;
+ }
+ },
+ new Batch.Callback<TestProtos.EchoResponseProto>() {
+ public void update(byte[] region, byte[] row, TestProtos.EchoResponseProto result) {
+ assertNotNull(result);
+ assertEquals("hello", result.getMessage());
+ results.put(region, result.getMessage());
+ }
+ }
+ );
+ for (Map.Entry<byte[], String> e : results.entrySet()) {
+ LOG.info("Got value "+e.getValue()+" for region "+Bytes.toStringBinary(e.getKey()));
+ }
+ assertEquals(2, results.size());
+ } finally {
+ table.close();
+ }
+ }
+
private static byte[][] makeN(byte[] base, int n) {
byte[][] ret = new byte[n][];
for (int i = 0; i < n; i++) {
Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestRowCountEndpoint.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestRowCountEndpoint.java?rev=1387001&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestRowCountEndpoint.java (added)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestRowCountEndpoint.java Tue Sep 18 06:32:57 2012
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.coprocessor.example;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.coprocessor.Batch;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.coprocessor.example.generated.ExampleProtos;
+import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
+import org.apache.hadoop.hbase.ipc.ServerRpcController;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+
+import static junit.framework.Assert.*;
+
+/**
+ * Test case demonstrating client interactions with the {@link RowCountEndpoint}
+ * sample coprocessor Service implementation.
+ */
+@Category(MediumTests.class)
+public class TestRowCountEndpoint {
+ private static final byte[] TEST_TABLE = Bytes.toBytes("testrowcounter");
+ private static final byte[] TEST_FAMILY = Bytes.toBytes("f");
+ private static final byte[] TEST_COLUMN = Bytes.toBytes("col");
+
+ private static HBaseTestingUtility TEST_UTIL = null;
+ private static Configuration CONF = null;
+
+ @BeforeClass
+ public static void setupBeforeClass() throws Exception {
+ TEST_UTIL = new HBaseTestingUtility();
+ CONF = TEST_UTIL.getConfiguration();
+ CONF.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
+ RowCountEndpoint.class.getName());
+
+ TEST_UTIL.startMiniCluster();
+ TEST_UTIL.createTable(TEST_TABLE, TEST_FAMILY);
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @Test
+ public void testEndpoint() throws Throwable {
+ HTable table = new HTable(CONF, TEST_TABLE);
+
+ // insert some test rows
+ for (int i=0; i<5; i++) {
+ byte[] iBytes = Bytes.toBytes(i);
+ Put p = new Put(iBytes);
+ p.add(TEST_FAMILY, TEST_COLUMN, iBytes);
+ table.put(p);
+ }
+
+ final ExampleProtos.CountRequest request = ExampleProtos.CountRequest.getDefaultInstance();
+ Map<byte[],Long> results = table.coprocessorService(ExampleProtos.RowCountService.class,
+ null, null,
+ new Batch.Call<ExampleProtos.RowCountService,Long>() {
+ public Long call(ExampleProtos.RowCountService counter) throws IOException {
+ ServerRpcController controller = new ServerRpcController();
+ BlockingRpcCallback<ExampleProtos.CountResponse> rpcCallback =
+ new BlockingRpcCallback<ExampleProtos.CountResponse>();
+ counter.getRowCount(controller, request, rpcCallback);
+ ExampleProtos.CountResponse response = rpcCallback.get();
+ if (controller.failedOnException()) {
+ throw controller.getFailedOn();
+ }
+ return (response != null && response.hasCount()) ? response.getCount() : 0;
+ }
+ });
+ // should be one region with results
+ assertEquals(1, results.size());
+ Iterator<Long> iter = results.values().iterator();
+ Long val = iter.next();
+ assertNotNull(val);
+ assertEquals(5l, val.longValue());
+ }
+
+ @org.junit.Rule
+ public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =
+ new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();
+}
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java?rev=1387001&r1=1387000&r2=1387001&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java Tue Sep 18 06:32:57 2012
@@ -62,6 +62,7 @@ import org.apache.hadoop.hbase.protobuf.
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ExecCoprocessorRequest;
@@ -413,6 +414,12 @@ class MockRegionServer implements AdminP
}
@Override
+ public ClientProtos.CoprocessorServiceResponse execService(RpcController controller,
+ ClientProtos.CoprocessorServiceRequest request) throws ServiceException {
+ return null;
+ }
+
+ @Override
public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse multi(
RpcController controller, MultiRequest request) throws ServiceException {
// TODO Auto-generated method stub
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java?rev=1387001&r1=1387000&r2=1387001&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java Tue Sep 18 06:32:57 2012
@@ -18,6 +18,8 @@
package org.apache.hadoop.hbase.security.access;
+import static org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService;
+import static org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.CheckPermissionsRequest;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -27,6 +29,11 @@ import java.security.PrivilegedException
import java.util.List;
import java.util.Map;
+import com.google.common.collect.Lists;
+import com.google.protobuf.BlockingRpcChannel;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.RpcChannel;
+import com.google.protobuf.ServiceException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.HBaseTestingUtility;
@@ -52,6 +59,8 @@ import org.apache.hadoop.hbase.coprocess
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
import org.apache.hadoop.hbase.security.AccessDeniedException;
@@ -128,26 +137,32 @@ public class TestAccessController {
// initilize access control
HTable meta = new HTable(conf, AccessControlLists.ACL_TABLE_NAME);
- AccessControllerProtocol protocol = meta.coprocessorProxy(AccessControllerProtocol.class,
- TEST_TABLE);
+ BlockingRpcChannel service = meta.coprocessorService(TEST_TABLE);
+ AccessControlService.BlockingInterface protocol =
+ AccessControlService.newBlockingStub(service);
HRegion region = TEST_UTIL.getHBaseCluster().getRegions(TEST_TABLE).get(0);
RegionCoprocessorHost rcpHost = region.getCoprocessorHost();
RCP_ENV = rcpHost.createEnvironment(AccessController.class, ACCESS_CONTROLLER,
Coprocessor.PRIORITY_HIGHEST, 1, conf);
- protocol.grant(new UserPermission(Bytes.toBytes(USER_ADMIN.getShortName()),
- Permission.Action.ADMIN, Permission.Action.CREATE, Permission.Action.READ,
- Permission.Action.WRITE));
-
- protocol.grant(new UserPermission(Bytes.toBytes(USER_RW.getShortName()), TEST_TABLE,
- TEST_FAMILY, Permission.Action.READ, Permission.Action.WRITE));
+ protocol.grant(null, newGrantRequest(USER_ADMIN.getShortName(),
+ null, null, null,
+ AccessControlProtos.Permission.Action.ADMIN,
+ AccessControlProtos.Permission.Action.CREATE,
+ AccessControlProtos.Permission.Action.READ,
+ AccessControlProtos.Permission.Action.WRITE));
+
+ protocol.grant(null, newGrantRequest(USER_RW.getShortName(),
+ TEST_TABLE, TEST_FAMILY, null,
+ AccessControlProtos.Permission.Action.READ,
+ AccessControlProtos.Permission.Action.WRITE));
- protocol.grant(new UserPermission(Bytes.toBytes(USER_RO.getShortName()), TEST_TABLE,
- TEST_FAMILY, Permission.Action.READ));
+ protocol.grant(null, newGrantRequest(USER_RO.getShortName(), TEST_TABLE,
+ TEST_FAMILY, null, AccessControlProtos.Permission.Action.READ));
- protocol.grant(new UserPermission(Bytes.toBytes(USER_CREATE.getShortName()), TEST_TABLE, null,
- Permission.Action.CREATE));
+ protocol.grant(null, newGrantRequest(USER_CREATE.getShortName(),
+ TEST_TABLE, null, null, AccessControlProtos.Permission.Action.CREATE));
}
@AfterClass
@@ -155,6 +170,32 @@ public class TestAccessController {
TEST_UTIL.shutdownMiniCluster();
}
+ private static AccessControlProtos.GrantRequest newGrantRequest(
+ String username, byte[] table, byte[] family, byte[] qualifier,
+ AccessControlProtos.Permission.Action... actions) {
+ AccessControlProtos.Permission.Builder permissionBuilder =
+ AccessControlProtos.Permission.newBuilder();
+ for (AccessControlProtos.Permission.Action a : actions) {
+ permissionBuilder.addAction(a);
+ }
+ if (table != null) {
+ permissionBuilder.setTable(ByteString.copyFrom(table));
+ }
+ if (family != null) {
+ permissionBuilder.setFamily(ByteString.copyFrom(family));
+ }
+ if (qualifier != null) {
+ permissionBuilder.setQualifier(ByteString.copyFrom(qualifier));
+ }
+
+ return AccessControlProtos.GrantRequest.newBuilder()
+ .setPermission(
+ AccessControlProtos.UserPermission.newBuilder()
+ .setUser(ByteString.copyFromUtf8(username))
+ .setPermission(permissionBuilder.build())
+ ).build();
+ }
+
public void verifyAllowed(User user, PrivilegedExceptionAction... actions) throws Exception {
for (PrivilegedExceptionAction action : actions) {
try {
@@ -182,7 +223,13 @@ public class TestAccessController {
// AccessDeniedException
boolean isAccessDeniedException = false;
for (Throwable ex : e.getCauses()) {
- if (ex instanceof AccessDeniedException) {
+ if (ex instanceof ServiceException) {
+ ServiceException se = (ServiceException)ex;
+ if (se.getCause() != null && se.getCause() instanceof AccessDeniedException) {
+ isAccessDeniedException = true;
+ break;
+ }
+ } else if (ex instanceof AccessDeniedException) {
isAccessDeniedException = true;
break;
}
@@ -1117,15 +1164,25 @@ public class TestAccessController {
public void checkGlobalPerms(Permission.Action... actions) throws IOException {
HTable acl = new HTable(conf, AccessControlLists.ACL_TABLE_NAME);
- AccessControllerProtocol protocol = acl.coprocessorProxy(AccessControllerProtocol.class,
- new byte[0]);
+ BlockingRpcChannel channel = acl.coprocessorService(new byte[0]);
+ AccessControlService.BlockingInterface protocol =
+ AccessControlService.newBlockingStub(channel);
Permission[] perms = new Permission[actions.length];
for (int i = 0; i < actions.length; i++) {
perms[i] = new Permission(actions[i]);
}
- protocol.checkPermissions(perms);
+ CheckPermissionsRequest.Builder request = CheckPermissionsRequest.newBuilder();
+ for (Action a : actions) {
+ request.addPermission(AccessControlProtos.Permission.newBuilder()
+ .addAction(ProtobufUtil.toPermissionAction(a)).build());
+ }
+ try {
+ protocol.checkPermissions(null, request.build());
+ } catch (ServiceException se) {
+ ProtobufUtil.toIOException(se);
+ }
}
public void checkTablePerms(byte[] table, byte[] family, byte[] column,
@@ -1140,22 +1197,39 @@ public class TestAccessController {
public void checkTablePerms(byte[] table, Permission... perms) throws IOException {
HTable acl = new HTable(conf, table);
- AccessControllerProtocol protocol = acl.coprocessorProxy(AccessControllerProtocol.class,
- new byte[0]);
-
- protocol.checkPermissions(perms);
+ AccessControlService.BlockingInterface protocol =
+ AccessControlService.newBlockingStub(acl.coprocessorService(new byte[0]));
+ CheckPermissionsRequest.Builder request = CheckPermissionsRequest.newBuilder();
+ for (Permission p : perms) {
+ request.addPermission(ProtobufUtil.toPermission(p));
+ }
+ try {
+ protocol.checkPermissions(null, request.build());
+ } catch (ServiceException se) {
+ ProtobufUtil.toIOException(se);
+ }
}
- public void grant(AccessControllerProtocol protocol, User user, byte[] t, byte[] f, byte[] q,
- Permission.Action... actions) throws IOException {
- protocol.grant(new UserPermission(Bytes.toBytes(user.getShortName()), t, f, q, actions));
+ public void grant(AccessControlService.BlockingInterface protocol, User user,
+ byte[] t, byte[] f, byte[] q, Permission.Action... actions)
+ throws ServiceException {
+ List<AccessControlProtos.Permission.Action> permActions =
+ Lists.newArrayListWithCapacity(actions.length);
+ for (Action a : actions) {
+ permActions.add(ProtobufUtil.toPermissionAction(a));
+ }
+ AccessControlProtos.GrantRequest request =
+ newGrantRequest(user.getShortName(), t, f, q, permActions.toArray(
+ new AccessControlProtos.Permission.Action[actions.length]));
+ protocol.grant(null, request);
}
@Test
public void testCheckPermissions() throws Exception {
final HTable acl = new HTable(conf, AccessControlLists.ACL_TABLE_NAME);
- final AccessControllerProtocol protocol = acl.coprocessorProxy(AccessControllerProtocol.class,
- TEST_TABLE);
+ BlockingRpcChannel channel = acl.coprocessorService(new byte[0]);
+ AccessControlService.BlockingInterface protocol =
+ AccessControlService.newBlockingStub(channel);
// --------------------------------------
// test global permissions
@@ -1278,11 +1352,15 @@ public class TestAccessController {
// --------------------------------------
// check for wrong table region
try {
+ CheckPermissionsRequest checkRequest =
+ CheckPermissionsRequest.newBuilder().addPermission(
+ AccessControlProtos.Permission.newBuilder()
+ .setTable(ByteString.copyFrom(TEST_TABLE)).addAction(AccessControlProtos.Permission.Action.CREATE)
+ ).build();
// but ask for TablePermissions for TEST_TABLE
- protocol.checkPermissions(new Permission[] { (Permission) new TablePermission(TEST_TABLE,
- null, (byte[]) null, Permission.Action.CREATE) });
+ protocol.checkPermissions(null, checkRequest);
fail("this should have thrown CoprocessorException");
- } catch (CoprocessorException ex) {
+ } catch (ServiceException ex) {
// expected
}