You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ga...@apache.org on 2012/09/18 08:32:58 UTC

svn commit: r1387001 [5/5] - in /hbase/trunk/hbase-server/src: main/java/org/apache/hadoop/hbase/client/ main/java/org/apache/hadoop/hbase/client/coprocessor/ main/java/org/apache/hadoop/hbase/coprocessor/ main/java/org/apache/hadoop/hbase/coprocessor/...

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1387001&r1=1387000&r2=1387001&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Tue Sep 18 06:32:57 2012
@@ -61,6 +61,7 @@ import java.util.concurrent.atomic.Atomi
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import com.google.protobuf.*;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -115,6 +116,7 @@ import org.apache.hadoop.hbase.ipc.HBase
 import org.apache.hadoop.hbase.ipc.RpcCallContext;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
 import org.apache.hadoop.hbase.regionserver.metrics.OperationMetrics;
 import org.apache.hadoop.hbase.regionserver.metrics.RegionMetricsStorage;
@@ -143,6 +145,8 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.MutableClassToInstanceMap;
 
+import static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall;
+
 /**
  * HRegion stores data for a certain region of a table.  It stores all columns
  * for each row. A given table consists of one or more HRegions.
@@ -215,6 +219,9 @@ public class HRegion implements HeapSize
   private Map<String, Class<? extends CoprocessorProtocol>>
       protocolHandlerNames = Maps.newHashMap();
 
+  // TODO: account for each registered handler in HeapSize computation
+  private Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap();
+
   /**
    * Temporary subdirectory of the region directory used for compaction output.
    */
@@ -5027,7 +5034,7 @@ public class HRegion implements HeapSize
   public static final long FIXED_OVERHEAD = ClassSize.align(
       ClassSize.OBJECT +
       ClassSize.ARRAY +
-      36 * ClassSize.REFERENCE + Bytes.SIZEOF_INT +
+      37 * ClassSize.REFERENCE + Bytes.SIZEOF_INT +
       (7 * Bytes.SIZEOF_LONG) +
       Bytes.SIZEOF_BOOLEAN);
 
@@ -5085,6 +5092,7 @@ public class HRegion implements HeapSize
    * @return {@code true} if the registration was successful, {@code false}
    * otherwise
    */
+  @Deprecated
   public <T extends CoprocessorProtocol> boolean registerProtocol(
       Class<T> protocol, T handler) {
 
@@ -5109,6 +5117,41 @@ public class HRegion implements HeapSize
   }
 
   /**
+   * Registers a new protocol buffer {@link Service} subclass as a coprocessor endpoint to
+   * be available for handling
+   * {@link HRegion#execService(com.google.protobuf.RpcController, org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall)}} calls.
+   *
+   * <p>
+   * Only a single instance may be registered per region for a given {@link Service} subclass (the
+   * instances are keyed on {@link com.google.protobuf.Descriptors.ServiceDescriptor#getFullName()}.
+   * After the first registration, subsequent calls with the same service name will fail with
+   * a return value of {@code false}.
+   * </p>
+   * @param instance the {@code Service} subclass instance to expose as a coprocessor endpoint
+   * @return {@code true} if the registration was successful, {@code false}
+   * otherwise
+   */
+  public boolean registerService(Service instance) {
+    /*
+     * No stacking of instances is allowed for a single service name
+     */
+    Descriptors.ServiceDescriptor serviceDesc = instance.getDescriptorForType();
+    if (coprocessorServiceHandlers.containsKey(serviceDesc.getFullName())) {
+      LOG.error("Coprocessor service "+serviceDesc.getFullName()+
+          " already registered, rejecting request from "+instance
+      );
+      return false;
+    }
+
+    coprocessorServiceHandlers.put(serviceDesc.getFullName(), instance);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Registered coprocessor service: region="+
+          Bytes.toStringBinary(getRegionName())+" service="+serviceDesc.getFullName());
+    }
+    return true;
+  }
+
+  /**
    * Executes a single {@link org.apache.hadoop.hbase.ipc.CoprocessorProtocol}
    * method using the registered protocol handlers.
    * {@link CoprocessorProtocol} implementations must be registered via the
@@ -5123,6 +5166,7 @@ public class HRegion implements HeapSize
    *     occurs during the invocation
    * @see org.apache.hadoop.hbase.regionserver.HRegion#registerProtocol(Class, org.apache.hadoop.hbase.ipc.CoprocessorProtocol)
    */
+  @Deprecated
   public ExecResult exec(Exec call)
       throws IOException {
     Class<? extends CoprocessorProtocol> protocol = call.getProtocol();
@@ -5174,6 +5218,55 @@ public class HRegion implements HeapSize
     return new ExecResult(getRegionName(), value);
   }
 
+  /**
+   * Executes a single protocol buffer coprocessor endpoint {@link Service} method using
+   * the registered protocol handlers.  {@link Service} implementations must be registered via the
+   * {@link org.apache.hadoop.hbase.regionserver.HRegion#registerService(com.google.protobuf.Service)}
+   * method before they are available.
+   *
+   * @param controller an {@code RpcContoller} implementation to pass to the invoked service
+   * @param call a {@code CoprocessorServiceCall} instance identifying the service, method,
+   *     and parameters for the method invocation
+   * @return a protocol buffer {@code Message} instance containing the method's result
+   * @throws IOException if no registered service handler is found or an error
+   *     occurs during the invocation
+   * @see org.apache.hadoop.hbase.regionserver.HRegion#registerService(com.google.protobuf.Service)
+   */
+  public Message execService(RpcController controller, CoprocessorServiceCall call)
+      throws IOException {
+    String serviceName = call.getServiceName();
+    String methodName = call.getMethodName();
+    if (!coprocessorServiceHandlers.containsKey(serviceName)) {
+      throw new HBaseRPC.UnknownProtocolException(null,
+          "No registered coprocessor service found for name "+serviceName+
+          " in region "+Bytes.toStringBinary(getRegionName()));
+    }
+
+    Service service = coprocessorServiceHandlers.get(serviceName);
+    Descriptors.ServiceDescriptor serviceDesc = service.getDescriptorForType();
+    Descriptors.MethodDescriptor methodDesc = serviceDesc.findMethodByName(methodName);
+    if (methodDesc == null) {
+      throw new HBaseRPC.UnknownProtocolException(service.getClass(),
+          "Unknown method "+methodName+" called on service "+serviceName+
+              " in region "+Bytes.toStringBinary(getRegionName()));
+    }
+
+    Message request = service.getRequestPrototype(methodDesc).newBuilderForType()
+        .mergeFrom(call.getRequest()).build();
+    final Message.Builder responseBuilder =
+        service.getResponsePrototype(methodDesc).newBuilderForType();
+    service.callMethod(methodDesc, controller, request, new RpcCallback<Message>() {
+      @Override
+      public void run(Message message) {
+        if (message != null) {
+          responseBuilder.mergeFrom(message);
+        }
+      }
+    });
+
+    return responseBuilder.build();
+  }
+
   /*
    * Process table.
    * Do major compaction or list content.

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1387001&r1=1387000&r2=1387001&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Tue Sep 18 06:32:57 2012
@@ -54,6 +54,7 @@ import java.util.concurrent.locks.Reentr
 
 import javax.management.ObjectName;
 
+import com.google.protobuf.Message;
 import org.apache.commons.lang.mutable.MutableDouble;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -115,6 +116,7 @@ import org.apache.hadoop.hbase.ipc.HBase
 import org.apache.hadoop.hbase.ipc.ProtocolSignature;
 import org.apache.hadoop.hbase.ipc.RpcServer;
 import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
+import org.apache.hadoop.hbase.ipc.ServerRpcController;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
@@ -230,6 +232,9 @@ import com.google.protobuf.ByteString;
 import com.google.protobuf.Message;
 import com.google.protobuf.RpcController;
 
+import static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
+import static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
+
 /**
  * HRegionServer makes a set of HRegions available to clients. It checks in with
  * the HMaster. There are many HRegionServers in a single HBase deployment.
@@ -3333,6 +3338,31 @@ public class  HRegionServer implements C
     }
   }
 
+  @Override
+  public CoprocessorServiceResponse execService(final RpcController controller,
+      final CoprocessorServiceRequest request) throws ServiceException {
+    try {
+      requestCount.incrementAndGet();
+      HRegion region = getRegion(request.getRegion());
+      // ignore the passed in controller (from the serialized call)
+      ServerRpcController execController = new ServerRpcController();
+      Message result = region.execService(execController, request.getCall());
+      if (execController.getFailedOn() != null) {
+        throw execController.getFailedOn();
+      }
+      CoprocessorServiceResponse.Builder builder =
+          CoprocessorServiceResponse.newBuilder();
+      builder.setRegion(RequestConverter.buildRegionSpecifier(
+          RegionSpecifierType.REGION_NAME, region.getRegionName()));
+      builder.setValue(
+          builder.getValueBuilder().setName(result.getClass().getName())
+              .setValue(result.toByteString()));
+      return builder.build();
+    } catch (IOException ie) {
+      throw new ServiceException(ie);
+    }
+  }
+
   /**
    * Execute multiple actions on a table: get, mutate, and/or execCoprocessor
    *

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java?rev=1387001&r1=1387000&r2=1387001&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java Tue Sep 18 06:32:57 2012
@@ -49,6 +49,7 @@ import org.apache.hadoop.hbase.client.Pu
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.RegionObserver;
@@ -216,7 +217,11 @@ public class RegionCoprocessorHost
     for (Class c : implClass.getInterfaces()) {
       if (CoprocessorProtocol.class.isAssignableFrom(c)) {
         region.registerProtocol(c, (CoprocessorProtocol)instance);
-        break;
+      }
+      // we allow endpoints to register as both CoproocessorProtocols and Services
+      // for ease of transition
+      if (CoprocessorService.class.isAssignableFrom(c)) {
+        region.registerService( ((CoprocessorService)instance).getService() );
       }
     }
     ConcurrentMap<String, Object> classData;

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java?rev=1387001&r1=1387000&r2=1387001&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java Tue Sep 18 06:32:57 2012
@@ -28,12 +28,15 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
 
+import com.google.protobuf.Service;
+import com.google.protobuf.ServiceException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 import org.apache.hadoop.hbase.client.*;
 import org.apache.hadoop.hbase.client.coprocessor.Batch;
 import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
 import org.apache.hadoop.util.StringUtils;
 
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -733,6 +736,25 @@ public class RemoteHTable implements HTa
   }
 
   @Override
+  public CoprocessorRpcChannel coprocessorService(byte[] row) {
+    throw new UnsupportedOperationException("coprocessorService not implemented");
+  }
+
+  @Override
+  public <T extends Service, R> Map<byte[], R> coprocessorService(Class<T> service,
+      byte[] startKey, byte[] endKey, Batch.Call<T, R> callable)
+      throws ServiceException, Throwable {
+    throw new UnsupportedOperationException("coprocessorService not implemented");
+  }
+
+  @Override
+  public <T extends Service, R> void coprocessorService(Class<T> service,
+      byte[] startKey, byte[] endKey, Batch.Call<T, R> callable, Batch.Callback<R> callback)
+      throws ServiceException, Throwable {
+    throw new UnsupportedOperationException("coprocessorService not implemented");
+  }
+
+  @Override
   public void mutateRow(RowMutations rm) throws IOException {
     throw new IOException("atomicMutation not supported");
   }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java?rev=1387001&r1=1387000&r2=1387001&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java Tue Sep 18 06:32:57 2012
@@ -24,6 +24,9 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
 
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.Service;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -40,18 +43,16 @@ import org.apache.hadoop.hbase.client.In
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
-import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
-import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
-import org.apache.hadoop.hbase.coprocessor.MasterObserver;
-import org.apache.hadoop.hbase.coprocessor.ObserverContext;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.*;
 import org.apache.hadoop.hbase.filter.CompareFilter;
 import org.apache.hadoop.hbase.filter.FilterList;
 import org.apache.hadoop.hbase.filter.ByteArrayComparable;
 import org.apache.hadoop.hbase.ipc.HBaseRPC;
 import org.apache.hadoop.hbase.ipc.ProtocolSignature;
 import org.apache.hadoop.hbase.ipc.RequestContext;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.ResponseConverter;
+import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
@@ -69,6 +70,8 @@ import com.google.common.collect.MapMake
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
+import static org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService;
+
 /**
  * Provides basic authorization checks for data access and administrative
  * operations.
@@ -101,7 +104,8 @@ import com.google.common.collect.Sets;
  * </p>
  */
 public class AccessController extends BaseRegionObserver
-    implements MasterObserver, AccessControllerProtocol {
+    implements MasterObserver, AccessControllerProtocol,
+    AccessControlService.Interface, CoprocessorService {
   /**
    * Represents the result of an authorization check for logging and error
    * reporting.
@@ -1049,6 +1053,7 @@ public class AccessController extends Ba
    * These methods are only allowed to be called against the _acl_ region(s).
    * This will be restricted by both client side and endpoint implementations.
    */
+  @Deprecated
   @Override
   public void grant(UserPermission perm) throws IOException {
     // verify it's only running at .acl.
@@ -1079,6 +1084,7 @@ public class AccessController extends Ba
             permission.getActions()));
   }
 
+  @Deprecated
   @Override
   public void revoke(UserPermission perm) throws IOException {
     // only allowed to be called on _acl_ region
@@ -1109,6 +1115,7 @@ public class AccessController extends Ba
             permission.getActions()));
   }
 
+  @Deprecated
   @Override
   public List<UserPermission> getUserPermissions(final byte[] tableName) throws IOException {
     // only allowed to be called on _acl_ region
@@ -1124,6 +1131,7 @@ public class AccessController extends Ba
     }
   }
 
+  @Deprecated
   @Override
   public void checkPermissions(Permission[] permissions) throws IOException {
     byte[] tableName = regionEnv.getRegion().getTableDesc().getName();
@@ -1158,11 +1166,13 @@ public class AccessController extends Ba
     }
   }
 
+  @Deprecated
   @Override
   public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
     return PROTOCOL_VERSION;
   }
 
+  @Deprecated
   @Override
   public ProtocolSignature getProtocolSignature(String protocol,
       long clientVersion, int clientMethodsHash) throws IOException {
@@ -1173,6 +1183,79 @@ public class AccessController extends Ba
         "Unexpected protocol requested: "+protocol);
   }
 
+
+  /* ---- Protobuf AccessControlService implementation ---- */
+  @Override
+  public void grant(RpcController controller,
+                    AccessControlProtos.GrantRequest request,
+                    RpcCallback<AccessControlProtos.GrantResponse> done) {
+    UserPermission perm = ProtobufUtil.toUserPermission(request.getPermission());
+    AccessControlProtos.GrantResponse response = null;
+    try {
+      grant(perm);
+      response = AccessControlProtos.GrantResponse.getDefaultInstance();
+    } catch (IOException ioe) {
+      // pass exception back up
+      ResponseConverter.setControllerException(controller, ioe);
+    }
+    done.run(response);
+  }
+
+  @Override
+  public void revoke(RpcController controller,
+                     AccessControlProtos.RevokeRequest request,
+                     RpcCallback<AccessControlProtos.RevokeResponse> done) {
+    UserPermission perm = ProtobufUtil.toUserPermission(request.getPermission());
+    AccessControlProtos.RevokeResponse response = null;
+    try {
+      revoke(perm);
+      response = AccessControlProtos.RevokeResponse.getDefaultInstance();
+    } catch (IOException ioe) {
+      // pass exception back up
+      ResponseConverter.setControllerException(controller, ioe);
+    }
+    done.run(response);
+  }
+
+  @Override
+  public void getUserPermissions(RpcController controller,
+                                 AccessControlProtos.UserPermissionsRequest request,
+                                 RpcCallback<AccessControlProtos.UserPermissionsResponse> done) {
+    byte[] table = request.getTable().toByteArray();
+    AccessControlProtos.UserPermissionsResponse response = null;
+    try {
+      List<UserPermission> perms = getUserPermissions(table);
+      response = ResponseConverter.buildUserPermissionsResponse(perms);
+    } catch (IOException ioe) {
+      // pass exception back up
+      ResponseConverter.setControllerException(controller, ioe);
+    }
+    done.run(response);
+  }
+
+  @Override
+  public void checkPermissions(RpcController controller,
+                               AccessControlProtos.CheckPermissionsRequest request,
+                               RpcCallback<AccessControlProtos.CheckPermissionsResponse> done) {
+    Permission[] perms = new Permission[request.getPermissionCount()];
+    for (int i=0; i < request.getPermissionCount(); i++) {
+      perms[i] = ProtobufUtil.toPermission(request.getPermission(i));
+    }
+    AccessControlProtos.CheckPermissionsResponse response = null;
+    try {
+      checkPermissions(perms);
+      response = AccessControlProtos.CheckPermissionsResponse.getDefaultInstance();
+    } catch (IOException ioe) {
+      ResponseConverter.setControllerException(controller, ioe);
+    }
+    done.run(response);
+  }
+
+  @Override
+  public Service getService() {
+    return AccessControlProtos.AccessControlService.newReflectiveService(this);
+  }
+
   private byte[] getTableName(RegionCoprocessorEnvironment e) {
     HRegion region = e.getRegion();
     byte[] tableName = null;

Modified: hbase/trunk/hbase-server/src/main/protobuf/AccessControl.proto
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/protobuf/AccessControl.proto?rev=1387001&r1=1387000&r2=1387001&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/protobuf/AccessControl.proto (original)
+++ hbase/trunk/hbase-server/src/main/protobuf/AccessControl.proto Tue Sep 18 06:32:57 2012
@@ -52,3 +52,48 @@ message UserTablePermissions {
 
   repeated UserPermissions permissions = 1;
 }
+
+message GrantRequest {
+    required UserPermission permission = 1;
+}
+
+message GrantResponse {
+}
+
+message RevokeRequest {
+    required UserPermission permission = 1;
+
+}
+
+message RevokeResponse {
+}
+
+
+message UserPermissionsRequest {
+    required bytes table = 1;
+}
+
+message UserPermissionsResponse {
+    repeated UserPermission permission = 1;
+}
+
+message CheckPermissionsRequest {
+    repeated Permission permission = 1;
+}
+
+message CheckPermissionsResponse {
+}
+
+service AccessControlService {
+    rpc grant(GrantRequest)
+      returns (GrantResponse);
+
+    rpc revoke(RevokeRequest)
+      returns (RevokeResponse);
+
+    rpc getUserPermissions(UserPermissionsRequest)
+      returns (UserPermissionsResponse);
+
+    rpc checkPermissions(CheckPermissionsRequest)
+      returns (CheckPermissionsResponse);
+}

Modified: hbase/trunk/hbase-server/src/main/protobuf/Client.proto
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/protobuf/Client.proto?rev=1387001&r1=1387000&r2=1387001&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/protobuf/Client.proto (original)
+++ hbase/trunk/hbase-server/src/main/protobuf/Client.proto Tue Sep 18 06:32:57 2012
@@ -296,6 +296,23 @@ message ExecCoprocessorResponse {
   required NameBytesPair value = 1;
 }
 
+message CoprocessorServiceCall {
+  required bytes row = 1;
+  required string serviceName = 2;
+  required string methodName = 3;
+  required bytes request = 4;
+}
+
+message CoprocessorServiceRequest {
+  required RegionSpecifier region = 1;
+  required CoprocessorServiceCall call = 2;
+}
+
+message CoprocessorServiceResponse {
+  required RegionSpecifier region = 1;
+  required NameBytesPair value = 2;
+}
+
 /**
  * An action that is part of MultiRequest.
  * This is a union type - exactly one of the fields will be set.
@@ -359,6 +376,9 @@ service ClientService {
   rpc execCoprocessor(ExecCoprocessorRequest)
     returns(ExecCoprocessorResponse);
 
+  rpc execService(CoprocessorServiceRequest)
+    returns(CoprocessorServiceResponse);
+
   rpc multi(MultiRequest)
     returns(MultiResponse);
 }

Added: hbase/trunk/hbase-server/src/main/protobuf/Examples.proto
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/protobuf/Examples.proto?rev=1387001&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/protobuf/Examples.proto (added)
+++ hbase/trunk/hbase-server/src/main/protobuf/Examples.proto Tue Sep 18 06:32:57 2012
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.hadoop.hbase.coprocessor.example.generated";
+option java_outer_classname = "ExampleProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+option optimize_for = SPEED;
+
+message CountRequest {
+}
+
+message CountResponse {
+  required int64 count = 1 [default = 0];
+}
+
+service RowCountService {
+  rpc getRowCount(CountRequest)
+    returns (CountResponse);
+  rpc getKeyValueCount(CountRequest)
+    returns (CountResponse);
+}
\ No newline at end of file

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpoint.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpoint.java?rev=1387001&r1=1387000&r2=1387001&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpoint.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpoint.java Tue Sep 18 06:32:57 2012
@@ -46,13 +46,16 @@ implements ColumnAggregationProtocol {
         .getRegion().getScanner(scan);
     try {
       List<KeyValue> curVals = new ArrayList<KeyValue>();
-      boolean done = false;
+      boolean hasMore = false;
       do {
         curVals.clear();
-        done = scanner.next(curVals);
-        KeyValue kv = curVals.get(0);
-        sumResult += Bytes.toInt(kv.getBuffer(), kv.getValueOffset());
-      } while (done);
+        hasMore = scanner.next(curVals);
+        for (KeyValue kv : curVals) {
+          if (Bytes.equals(qualifier, kv.getQualifier())) {
+            sumResult += Bytes.toInt(kv.getBuffer(), kv.getValueOffset());
+          }
+        }
+      } while (hasMore);
     } finally {
       scanner.close();
     }

Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/ProtobufCoprocessorService.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/ProtobufCoprocessorService.java?rev=1387001&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/ProtobufCoprocessorService.java (added)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/ProtobufCoprocessorService.java Tue Sep 18 06:32:57 2012
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.coprocessor;
+
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.Service;
+import org.apache.hadoop.hbase.Coprocessor;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos;
+import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
+import org.apache.hadoop.hbase.protobuf.ResponseConverter;
+
+import java.io.IOException;
+
+/**
+ * Test implementation of a coprocessor endpoint exposing the
+ * {@link TestRpcServiceProtos.TestProtobufRpcProto} service methods.  For internal use by
+ * unit tests only.
+ */
+public class ProtobufCoprocessorService
+    extends TestRpcServiceProtos.TestProtobufRpcProto
+    implements CoprocessorService, Coprocessor {
+  public ProtobufCoprocessorService() {
+  }
+
+  @Override
+  public Service getService() {
+    return this;
+  }
+
+  @Override
+  public void ping(RpcController controller, TestProtos.EmptyRequestProto request,
+                   RpcCallback<TestProtos.EmptyResponseProto> done) {
+    done.run(TestProtos.EmptyResponseProto.getDefaultInstance());
+  }
+
+  @Override
+  public void echo(RpcController controller, TestProtos.EchoRequestProto request,
+                   RpcCallback<TestProtos.EchoResponseProto> done) {
+    String message = request.getMessage();
+    done.run(TestProtos.EchoResponseProto.newBuilder().setMessage(message).build());
+  }
+
+  @Override
+  public void error(RpcController controller, TestProtos.EmptyRequestProto request,
+                    RpcCallback<TestProtos.EmptyResponseProto> done) {
+    ResponseConverter.setControllerException(controller, new IOException("Test exception"));
+    done.run(null);
+  }
+
+  @Override
+  public void start(CoprocessorEnvironment env) throws IOException {
+    //To change body of implemented methods use File | Settings | File Templates.
+  }
+
+  @Override
+  public void stop(CoprocessorEnvironment env) throws IOException {
+    //To change body of implemented methods use File | Settings | File Templates.
+  }
+}

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorEndpoint.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorEndpoint.java?rev=1387001&r1=1387000&r2=1387001&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorEndpoint.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorEndpoint.java Tue Sep 18 06:32:57 2012
@@ -18,43 +18,40 @@
  */
 package org.apache.hadoop.hbase.coprocessor;
 
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-
 import java.io.IOException;
+import java.util.Collections;
 import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
 
+import com.google.protobuf.RpcController;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.MediumTests;
-import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.coprocessor.Batch;
-import org.apache.hadoop.hbase.client.coprocessor.Exec;
-import org.apache.hadoop.hbase.io.HbaseObjectWritable;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
+import org.apache.hadoop.hbase.ipc.ServerRpcController;
+import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos;
+import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.Text;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import com.google.protobuf.ByteString;
+import static org.junit.Assert.*;
 
 /**
  * TestEndpoint: test cases to verify coprocessor Endpoint
  */
 @Category(MediumTests.class)
 public class TestCoprocessorEndpoint {
+  private static final Log LOG = LogFactory.getLog(TestCoprocessorEndpoint.class);
 
   private static final byte[] TEST_TABLE = Bytes.toBytes("TestTable");
   private static final byte[] TEST_FAMILY = Bytes.toBytes("TestFamily");
@@ -76,27 +73,23 @@ public class TestCoprocessorEndpoint {
     // set configure to indicate which cp should be loaded
     Configuration conf = util.getConfiguration();
     conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
-        "org.apache.hadoop.hbase.coprocessor.ColumnAggregationEndpoint",
-        "org.apache.hadoop.hbase.coprocessor.GenericEndpoint");
+        org.apache.hadoop.hbase.coprocessor.ColumnAggregationEndpoint.class.getName(),
+        org.apache.hadoop.hbase.coprocessor.GenericEndpoint.class.getName(),
+        ProtobufCoprocessorService.class.getName());
     util.startMiniCluster(2);
-    HTable table = util.createTable(TEST_TABLE, TEST_FAMILY);
-    util.createMultiRegions(util.getConfiguration(), table, TEST_FAMILY,
-                            new byte[][] { HConstants.EMPTY_BYTE_ARRAY,
-                                ROWS[rowSeperator1], ROWS[rowSeperator2] });
+    HBaseAdmin admin = new HBaseAdmin(conf);
+    HTableDescriptor desc = new HTableDescriptor(TEST_TABLE);
+    desc.addFamily(new HColumnDescriptor(TEST_FAMILY));
+    admin.createTable(desc, new byte[][]{ROWS[rowSeperator1], ROWS[rowSeperator2]});
+    util.waitUntilAllRegionsAssigned(3);
+    admin.close();
 
+    HTable table = new HTable(conf, TEST_TABLE);
     for (int i = 0; i < ROWSIZE; i++) {
       Put put = new Put(ROWS[i]);
-      put.setWriteToWAL(false);
       put.add(TEST_FAMILY, TEST_QUALIFIER, Bytes.toBytes(i));
       table.put(put);
     }
-
-    // sleep here is an ugly hack to allow region transitions to finish
-    long timeout = System.currentTimeMillis() + (15 * 1000);
-    while ((System.currentTimeMillis() < timeout) &&
-      (table.getRegionsInfo().size() != 2)) {
-      Thread.sleep(250);
-    }
     table.close();
   }
 
@@ -135,7 +128,7 @@ public class TestCoprocessorEndpoint {
     table.close();
   }
 
-  @Ignore @Test
+  @Test
   public void testAggregation() throws Throwable {
     HTable table = new HTable(util.getConfiguration(), TEST_TABLE);
     Map<byte[], Long> results;
@@ -143,7 +136,7 @@ public class TestCoprocessorEndpoint {
     // scan: for all regions
     results = table
         .coprocessorExec(ColumnAggregationProtocol.class,
-                         ROWS[rowSeperator1 - 1], ROWS[rowSeperator2 + 1],
+                         ROWS[0], ROWS[ROWS.length-1],
                          new Batch.Call<ColumnAggregationProtocol, Long>() {
                            public Long call(ColumnAggregationProtocol instance)
                                throws IOException {
@@ -153,19 +146,20 @@ public class TestCoprocessorEndpoint {
     int sumResult = 0;
     int expectedResult = 0;
     for (Map.Entry<byte[], Long> e : results.entrySet()) {
+      LOG.info("Got value "+e.getValue()+" for region "+Bytes.toStringBinary(e.getKey()));
       sumResult += e.getValue();
     }
     for (int i = 0; i < ROWSIZE; i++) {
       expectedResult += i;
     }
-    assertEquals("Invalid result", sumResult, expectedResult);
+    assertEquals("Invalid result", expectedResult, sumResult);
 
     results.clear();
 
     // scan: for region 2 and region 3
     results = table
         .coprocessorExec(ColumnAggregationProtocol.class,
-                         ROWS[rowSeperator1 + 1], ROWS[rowSeperator2 + 1],
+                         ROWS[rowSeperator1], ROWS[ROWS.length-1],
                          new Batch.Call<ColumnAggregationProtocol, Long>() {
                            public Long call(ColumnAggregationProtocol instance)
                                throws IOException {
@@ -175,15 +169,90 @@ public class TestCoprocessorEndpoint {
     sumResult = 0;
     expectedResult = 0;
     for (Map.Entry<byte[], Long> e : results.entrySet()) {
+      LOG.info("Got value "+e.getValue()+" for region "+Bytes.toStringBinary(e.getKey()));
       sumResult += e.getValue();
     }
     for (int i = rowSeperator1; i < ROWSIZE; i++) {
       expectedResult += i;
     }
-    assertEquals("Invalid result", sumResult, expectedResult);
+    assertEquals("Invalid result", expectedResult, sumResult);
     table.close();
   }
 
+  @Test
+  public void testCoprocessorService() throws Throwable {
+    HTable table = new HTable(util.getConfiguration(), TEST_TABLE);
+    NavigableMap<HRegionInfo,ServerName> regions = table.getRegionLocations();
+
+    final TestProtos.EchoRequestProto request =
+        TestProtos.EchoRequestProto.newBuilder().setMessage("hello").build();
+    final Map<byte[], String> results = Collections.synchronizedMap(
+        new TreeMap<byte[], String>(Bytes.BYTES_COMPARATOR));
+    try {
+      // scan: for all regions
+      final RpcController controller = new ServerRpcController();
+      table.coprocessorService(TestRpcServiceProtos.TestProtobufRpcProto.class,
+          ROWS[0], ROWS[ROWS.length - 1],
+          new Batch.Call<TestRpcServiceProtos.TestProtobufRpcProto, TestProtos.EchoResponseProto>() {
+            public TestProtos.EchoResponseProto call(TestRpcServiceProtos.TestProtobufRpcProto instance)
+                throws IOException {
+              LOG.debug("Default response is " + TestProtos.EchoRequestProto.getDefaultInstance());
+              BlockingRpcCallback<TestProtos.EchoResponseProto> callback = new BlockingRpcCallback<TestProtos.EchoResponseProto>();
+              instance.echo(controller, request, callback);
+              TestProtos.EchoResponseProto response = callback.get();
+              LOG.debug("Batch.Call returning result " + response);
+              return response;
+            }
+          },
+          new Batch.Callback<TestProtos.EchoResponseProto>() {
+            public void update(byte[] region, byte[] row, TestProtos.EchoResponseProto result) {
+              assertNotNull(result);
+              assertEquals("hello", result.getMessage());
+              results.put(region, result.getMessage());
+            }
+          }
+      );
+      for (Map.Entry<byte[], String> e : results.entrySet()) {
+        LOG.info("Got value "+e.getValue()+" for region "+Bytes.toStringBinary(e.getKey()));
+      }
+      assertEquals(3, results.size());
+      for (HRegionInfo info : regions.navigableKeySet()) {
+        LOG.info("Region info is "+info.getRegionNameAsString());
+        assertTrue(results.containsKey(info.getRegionName()));
+      }
+      results.clear();
+
+      // scan: for region 2 and region 3
+      table.coprocessorService(TestRpcServiceProtos.TestProtobufRpcProto.class,
+          ROWS[rowSeperator1], ROWS[ROWS.length - 1],
+          new Batch.Call<TestRpcServiceProtos.TestProtobufRpcProto, TestProtos.EchoResponseProto>() {
+            public TestProtos.EchoResponseProto call(TestRpcServiceProtos.TestProtobufRpcProto instance)
+                throws IOException {
+              LOG.debug("Default response is " + TestProtos.EchoRequestProto.getDefaultInstance());
+              BlockingRpcCallback<TestProtos.EchoResponseProto> callback = new BlockingRpcCallback<TestProtos.EchoResponseProto>();
+              instance.echo(controller, request, callback);
+              TestProtos.EchoResponseProto response = callback.get();
+              LOG.debug("Batch.Call returning result " + response);
+              return response;
+            }
+          },
+          new Batch.Callback<TestProtos.EchoResponseProto>() {
+            public void update(byte[] region, byte[] row, TestProtos.EchoResponseProto result) {
+              assertNotNull(result);
+              assertEquals("hello", result.getMessage());
+              results.put(region, result.getMessage());
+            }
+          }
+      );
+      for (Map.Entry<byte[], String> e : results.entrySet()) {
+        LOG.info("Got value "+e.getValue()+" for region "+Bytes.toStringBinary(e.getKey()));
+      }
+      assertEquals(2, results.size());
+    } finally {
+      table.close();
+    }
+  }
+
   private static byte[][] makeN(byte[] base, int n) {
     byte[][] ret = new byte[n][];
     for (int i = 0; i < n; i++) {

Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestRowCountEndpoint.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestRowCountEndpoint.java?rev=1387001&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestRowCountEndpoint.java (added)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestRowCountEndpoint.java Tue Sep 18 06:32:57 2012
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.coprocessor.example;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.coprocessor.Batch;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.coprocessor.example.generated.ExampleProtos;
+import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
+import org.apache.hadoop.hbase.ipc.ServerRpcController;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+
+import static junit.framework.Assert.*;
+
+/**
+ * Test case demonstrating client interactions with the {@link RowCountEndpoint}
+ * sample coprocessor Service implementation.
+ */
+@Category(MediumTests.class)
+public class TestRowCountEndpoint {
+  private static final byte[] TEST_TABLE = Bytes.toBytes("testrowcounter");
+  private static final byte[] TEST_FAMILY = Bytes.toBytes("f");
+  private static final byte[] TEST_COLUMN = Bytes.toBytes("col");
+
+  private static HBaseTestingUtility TEST_UTIL = null;
+  private static Configuration CONF = null;
+
+  @BeforeClass
+  public static void setupBeforeClass() throws Exception {
+    TEST_UTIL = new HBaseTestingUtility();
+    CONF = TEST_UTIL.getConfiguration();
+    CONF.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
+        RowCountEndpoint.class.getName());
+
+    TEST_UTIL.startMiniCluster();
+    TEST_UTIL.createTable(TEST_TABLE, TEST_FAMILY);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testEndpoint() throws Throwable {
+    HTable table = new HTable(CONF, TEST_TABLE);
+
+    // insert some test rows
+    for (int i=0; i<5; i++) {
+      byte[] iBytes = Bytes.toBytes(i);
+      Put p = new Put(iBytes);
+      p.add(TEST_FAMILY, TEST_COLUMN, iBytes);
+      table.put(p);
+    }
+
+    final ExampleProtos.CountRequest request = ExampleProtos.CountRequest.getDefaultInstance();
+    Map<byte[],Long> results = table.coprocessorService(ExampleProtos.RowCountService.class,
+        null, null,
+        new Batch.Call<ExampleProtos.RowCountService,Long>() {
+          public Long call(ExampleProtos.RowCountService counter) throws IOException {
+            ServerRpcController controller = new ServerRpcController();
+            BlockingRpcCallback<ExampleProtos.CountResponse> rpcCallback =
+                new BlockingRpcCallback<ExampleProtos.CountResponse>();
+            counter.getRowCount(controller, request, rpcCallback);
+            ExampleProtos.CountResponse response = rpcCallback.get();
+            if (controller.failedOnException()) {
+              throw controller.getFailedOn();
+            }
+            return (response != null && response.hasCount()) ? response.getCount() : 0;
+          }
+        });
+    // should be one region with results
+    assertEquals(1, results.size());
+    Iterator<Long> iter = results.values().iterator();
+    Long val = iter.next();
+    assertNotNull(val);
+    assertEquals(5l, val.longValue());
+  }
+
+  @org.junit.Rule
+  public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =
+      new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();
+}

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java?rev=1387001&r1=1387000&r2=1387001&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java Tue Sep 18 06:32:57 2012
@@ -62,6 +62,7 @@ import org.apache.hadoop.hbase.protobuf.
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionResponse;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerRequest;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ExecCoprocessorRequest;
@@ -413,6 +414,12 @@ class MockRegionServer implements AdminP
   }
 
   @Override
+  public ClientProtos.CoprocessorServiceResponse execService(RpcController controller,
+      ClientProtos.CoprocessorServiceRequest request) throws ServiceException {
+    return null;
+  }
+
+  @Override
   public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse multi(
       RpcController controller, MultiRequest request) throws ServiceException {
     // TODO Auto-generated method stub

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java?rev=1387001&r1=1387000&r2=1387001&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java Tue Sep 18 06:32:57 2012
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.hbase.security.access;
 
+import static org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService;
+import static org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.CheckPermissionsRequest;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -27,6 +29,11 @@ import java.security.PrivilegedException
 import java.util.List;
 import java.util.Map;
 
+import com.google.common.collect.Lists;
+import com.google.protobuf.BlockingRpcChannel;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.RpcChannel;
+import com.google.protobuf.ServiceException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Coprocessor;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
@@ -52,6 +59,8 @@ import org.apache.hadoop.hbase.coprocess
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
 import org.apache.hadoop.hbase.security.AccessDeniedException;
@@ -128,26 +137,32 @@ public class TestAccessController {
 
     // initilize access control
     HTable meta = new HTable(conf, AccessControlLists.ACL_TABLE_NAME);
-    AccessControllerProtocol protocol = meta.coprocessorProxy(AccessControllerProtocol.class,
-      TEST_TABLE);
+    BlockingRpcChannel service = meta.coprocessorService(TEST_TABLE);
+    AccessControlService.BlockingInterface protocol =
+        AccessControlService.newBlockingStub(service);
 
     HRegion region = TEST_UTIL.getHBaseCluster().getRegions(TEST_TABLE).get(0);
     RegionCoprocessorHost rcpHost = region.getCoprocessorHost();
     RCP_ENV = rcpHost.createEnvironment(AccessController.class, ACCESS_CONTROLLER,
       Coprocessor.PRIORITY_HIGHEST, 1, conf);
 
-    protocol.grant(new UserPermission(Bytes.toBytes(USER_ADMIN.getShortName()),
-        Permission.Action.ADMIN, Permission.Action.CREATE, Permission.Action.READ,
-        Permission.Action.WRITE));
-
-    protocol.grant(new UserPermission(Bytes.toBytes(USER_RW.getShortName()), TEST_TABLE,
-        TEST_FAMILY, Permission.Action.READ, Permission.Action.WRITE));
+    protocol.grant(null, newGrantRequest(USER_ADMIN.getShortName(),
+        null, null, null,
+        AccessControlProtos.Permission.Action.ADMIN,
+        AccessControlProtos.Permission.Action.CREATE,
+        AccessControlProtos.Permission.Action.READ,
+        AccessControlProtos.Permission.Action.WRITE));
+
+    protocol.grant(null, newGrantRequest(USER_RW.getShortName(),
+        TEST_TABLE, TEST_FAMILY, null,
+        AccessControlProtos.Permission.Action.READ,
+        AccessControlProtos.Permission.Action.WRITE));
 
-    protocol.grant(new UserPermission(Bytes.toBytes(USER_RO.getShortName()), TEST_TABLE,
-        TEST_FAMILY, Permission.Action.READ));
+    protocol.grant(null, newGrantRequest(USER_RO.getShortName(), TEST_TABLE,
+        TEST_FAMILY, null, AccessControlProtos.Permission.Action.READ));
 
-    protocol.grant(new UserPermission(Bytes.toBytes(USER_CREATE.getShortName()), TEST_TABLE, null,
-        Permission.Action.CREATE));
+    protocol.grant(null, newGrantRequest(USER_CREATE.getShortName(),
+        TEST_TABLE, null, null, AccessControlProtos.Permission.Action.CREATE));
   }
 
   @AfterClass
@@ -155,6 +170,32 @@ public class TestAccessController {
     TEST_UTIL.shutdownMiniCluster();
   }
 
+  private static AccessControlProtos.GrantRequest newGrantRequest(
+      String username, byte[] table, byte[] family, byte[] qualifier,
+      AccessControlProtos.Permission.Action... actions) {
+    AccessControlProtos.Permission.Builder permissionBuilder =
+        AccessControlProtos.Permission.newBuilder();
+    for (AccessControlProtos.Permission.Action a : actions) {
+      permissionBuilder.addAction(a);
+    }
+    if (table != null) {
+      permissionBuilder.setTable(ByteString.copyFrom(table));
+    }
+    if (family != null) {
+      permissionBuilder.setFamily(ByteString.copyFrom(family));
+    }
+    if (qualifier != null) {
+      permissionBuilder.setQualifier(ByteString.copyFrom(qualifier));
+    }
+
+    return AccessControlProtos.GrantRequest.newBuilder()
+        .setPermission(
+            AccessControlProtos.UserPermission.newBuilder()
+                .setUser(ByteString.copyFromUtf8(username))
+                .setPermission(permissionBuilder.build())
+        ).build();
+  }
+
   public void verifyAllowed(User user, PrivilegedExceptionAction... actions) throws Exception {
     for (PrivilegedExceptionAction action : actions) {
       try {
@@ -182,7 +223,13 @@ public class TestAccessController {
         // AccessDeniedException
         boolean isAccessDeniedException = false;
         for (Throwable ex : e.getCauses()) {
-          if (ex instanceof AccessDeniedException) {
+          if (ex instanceof ServiceException) {
+            ServiceException se = (ServiceException)ex;
+            if (se.getCause() != null && se.getCause() instanceof AccessDeniedException) {
+              isAccessDeniedException = true;
+              break;
+            }
+          } else if (ex instanceof AccessDeniedException) {
             isAccessDeniedException = true;
             break;
           }
@@ -1117,15 +1164,25 @@ public class TestAccessController {
 
   public void checkGlobalPerms(Permission.Action... actions) throws IOException {
     HTable acl = new HTable(conf, AccessControlLists.ACL_TABLE_NAME);
-    AccessControllerProtocol protocol = acl.coprocessorProxy(AccessControllerProtocol.class,
-      new byte[0]);
+    BlockingRpcChannel channel = acl.coprocessorService(new byte[0]);
+    AccessControlService.BlockingInterface protocol =
+        AccessControlService.newBlockingStub(channel);
 
     Permission[] perms = new Permission[actions.length];
     for (int i = 0; i < actions.length; i++) {
       perms[i] = new Permission(actions[i]);
     }
 
-    protocol.checkPermissions(perms);
+    CheckPermissionsRequest.Builder request = CheckPermissionsRequest.newBuilder();
+    for (Action a : actions) {
+      request.addPermission(AccessControlProtos.Permission.newBuilder()
+          .addAction(ProtobufUtil.toPermissionAction(a)).build());
+    }
+    try {
+      protocol.checkPermissions(null, request.build());
+    } catch (ServiceException se) {
+      ProtobufUtil.toIOException(se);
+    }
   }
 
   public void checkTablePerms(byte[] table, byte[] family, byte[] column,
@@ -1140,22 +1197,39 @@ public class TestAccessController {
 
   public void checkTablePerms(byte[] table, Permission... perms) throws IOException {
     HTable acl = new HTable(conf, table);
-    AccessControllerProtocol protocol = acl.coprocessorProxy(AccessControllerProtocol.class,
-      new byte[0]);
-
-    protocol.checkPermissions(perms);
+    AccessControlService.BlockingInterface protocol =
+        AccessControlService.newBlockingStub(acl.coprocessorService(new byte[0]));
+    CheckPermissionsRequest.Builder request = CheckPermissionsRequest.newBuilder();
+    for (Permission p : perms) {
+      request.addPermission(ProtobufUtil.toPermission(p));
+    }
+    try {
+      protocol.checkPermissions(null, request.build());
+    } catch (ServiceException se) {
+      ProtobufUtil.toIOException(se);
+    }
   }
 
-  public void grant(AccessControllerProtocol protocol, User user, byte[] t, byte[] f, byte[] q,
-      Permission.Action... actions) throws IOException {
-    protocol.grant(new UserPermission(Bytes.toBytes(user.getShortName()), t, f, q, actions));
+  public void grant(AccessControlService.BlockingInterface protocol, User user,
+      byte[] t, byte[] f, byte[] q, Permission.Action... actions)
+      throws ServiceException {
+    List<AccessControlProtos.Permission.Action> permActions =
+        Lists.newArrayListWithCapacity(actions.length);
+    for (Action a : actions) {
+      permActions.add(ProtobufUtil.toPermissionAction(a));
+    }
+    AccessControlProtos.GrantRequest request =
+        newGrantRequest(user.getShortName(), t, f, q, permActions.toArray(
+            new AccessControlProtos.Permission.Action[actions.length]));
+    protocol.grant(null, request);
   }
 
   @Test
   public void testCheckPermissions() throws Exception {
     final HTable acl = new HTable(conf, AccessControlLists.ACL_TABLE_NAME);
-    final AccessControllerProtocol protocol = acl.coprocessorProxy(AccessControllerProtocol.class,
-      TEST_TABLE);
+    BlockingRpcChannel channel = acl.coprocessorService(new byte[0]);
+    AccessControlService.BlockingInterface protocol =
+        AccessControlService.newBlockingStub(channel);
 
     // --------------------------------------
     // test global permissions
@@ -1278,11 +1352,15 @@ public class TestAccessController {
     // --------------------------------------
     // check for wrong table region
     try {
+      CheckPermissionsRequest checkRequest =
+          CheckPermissionsRequest.newBuilder().addPermission(
+              AccessControlProtos.Permission.newBuilder()
+                  .setTable(ByteString.copyFrom(TEST_TABLE)).addAction(AccessControlProtos.Permission.Action.CREATE)
+          ).build();
       // but ask for TablePermissions for TEST_TABLE
-      protocol.checkPermissions(new Permission[] { (Permission) new TablePermission(TEST_TABLE,
-          null, (byte[]) null, Permission.Action.CREATE) });
+      protocol.checkPermissions(null, checkRequest);
       fail("this should have thrown CoprocessorException");
-    } catch (CoprocessorException ex) {
+    } catch (ServiceException ex) {
       // expected
     }