You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2012/11/14 18:03:11 UTC
svn commit: r1409257 - in /hbase/trunk/hbase-server/src:
main/java/org/apache/hadoop/hbase/client/
main/java/org/apache/hadoop/hbase/ipc/
main/java/org/apache/hadoop/hbase/master/
main/java/org/apache/hadoop/hbase/protobuf/ main/java/org/apache/hadoop/...
Author: tedyu
Date: Wed Nov 14 17:03:06 2012
New Revision: 1409257
URL: http://svn.apache.org/viewvc?rev=1409257&view=rev
Log:
HBASE-7042 Master Coprocessor Endpoint (Francis Liu)
Added:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java
Modified:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTable.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CoprocessorRpcChannel.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/generated/MasterAdminProtos.java
hbase/trunk/hbase-server/src/main/protobuf/MasterAdmin.proto
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorEndpoint.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java?rev=1409257&r1=1409256&r2=1409257&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java Wed Nov 14 17:03:06 2012
@@ -57,6 +57,8 @@ import org.apache.hadoop.hbase.catalog.C
import org.apache.hadoop.hbase.catalog.MetaReader;
import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitorBase;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
+import org.apache.hadoop.hbase.ipc.MasterCoprocessorRpcChannel;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
@@ -2141,4 +2143,29 @@ public class HBaseAdmin implements Abort
throw new IOException("Unexpected exception when calling master", e);
}
}
+
+ /**
+ * Creates and returns a {@link com.google.protobuf.RpcChannel} instance
+ * connected to the active master.
+ *
+ * <p>
+ * The obtained {@link com.google.protobuf.RpcChannel} instance can be used to access a published
+ * coprocessor {@link com.google.protobuf.Service} using standard protobuf service invocations:
+ * </p>
+ *
+ * <div style="background-color: #cccccc; padding: 2px">
+ * <blockquote><pre>
+ * CoprocessorRpcChannel channel = myAdmin.coprocessorService();
+ * MyService.BlockingInterface service = MyService.newBlockingStub(channel);
+ * MyCallRequest request = MyCallRequest.newBuilder()
+ * ...
+ * .build();
+ * MyCallResponse response = service.myCall(null, request);
+ * </pre></blockquote></div>
+ *
+ * @return A MasterCoprocessorRpcChannel instance
+ */
+ public CoprocessorRpcChannel coprocessorService() {
+ return new MasterCoprocessorRpcChannel(connection);
+ }
}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTable.java?rev=1409257&r1=1409256&r2=1409257&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTable.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTable.java Wed Nov 14 17:03:06 2012
@@ -63,6 +63,7 @@ import org.apache.hadoop.hbase.io.DataIn
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
import org.apache.hadoop.hbase.ipc.ExecRPCInvoker;
+import org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
@@ -1343,7 +1344,7 @@ public class HTable implements HTableInt
* {@inheritDoc}
*/
public CoprocessorRpcChannel coprocessorService(byte[] row) {
- return new CoprocessorRpcChannel(connection, tableName, row);
+ return new RegionCoprocessorRpcChannel(connection, tableName, row);
}
/**
@@ -1420,8 +1421,8 @@ public class HTable implements HTableInt
Map<byte[],Future<R>> futures =
new TreeMap<byte[],Future<R>>(Bytes.BYTES_COMPARATOR);
for (final byte[] r : keys) {
- final CoprocessorRpcChannel channel =
- new CoprocessorRpcChannel(connection, tableName, r);
+ final RegionCoprocessorRpcChannel channel =
+ new RegionCoprocessorRpcChannel(connection, tableName, r);
Future<R> future = pool.submit(
new Callable<R>() {
public R call() throws Exception {
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CoprocessorRpcChannel.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CoprocessorRpcChannel.java?rev=1409257&r1=1409256&r2=1409257&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CoprocessorRpcChannel.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CoprocessorRpcChannel.java Wed Nov 14 17:03:06 2012
@@ -22,42 +22,18 @@ import com.google.protobuf.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.ServerCallable;
-import org.apache.hadoop.hbase.client.coprocessor.Exec;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
-import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
-import static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
-
/**
- * Provides clients with an RPC connection to call coprocessor endpoint {@link Service}s
- * against a given table region. An instance of this class may be obtained
- * by calling {@link org.apache.hadoop.hbase.client.HTable#coprocessorService(byte[])},
- * but should normally only be used in creating a new {@link Service} stub to call the endpoint
- * methods.
- * @see org.apache.hadoop.hbase.client.HTable#coprocessorService(byte[])
+ * Base class which provides clients with an RPC connection to
+ * call coprocessor endpoint {@link Service}s
*/
@InterfaceAudience.Private
-public class CoprocessorRpcChannel implements RpcChannel, BlockingRpcChannel {
+public abstract class CoprocessorRpcChannel implements RpcChannel, BlockingRpcChannel {
private static Log LOG = LogFactory.getLog(CoprocessorRpcChannel.class);
- private final HConnection connection;
- private final byte[] table;
- private final byte[] row;
- private byte[] lastRegion;
-
- public CoprocessorRpcChannel(HConnection conn, byte[] table, byte[] row) {
- this.connection = conn;
- this.table = table;
- this.row = row;
- }
-
@Override
public void callMethod(Descriptors.MethodDescriptor method,
RpcController controller,
@@ -87,46 +63,6 @@ public class CoprocessorRpcChannel imple
}
}
- private Message callExecService(Descriptors.MethodDescriptor method,
- Message request, Message responsePrototype)
- throws IOException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Call: "+method.getName()+", "+request.toString());
- }
-
- if (row == null) {
- throw new IllegalArgumentException("Missing row property for remote region location");
- }
-
- final ClientProtos.CoprocessorServiceCall call =
- ClientProtos.CoprocessorServiceCall.newBuilder()
- .setRow(ByteString.copyFrom(row))
- .setServiceName(method.getService().getFullName())
- .setMethodName(method.getName())
- .setRequest(request.toByteString()).build();
- ServerCallable<ClientProtos.CoprocessorServiceResponse> callable =
- new ServerCallable<ClientProtos.CoprocessorServiceResponse>(connection, table, row) {
- public CoprocessorServiceResponse call() throws Exception {
- byte[] regionName = location.getRegionInfo().getRegionName();
- return ProtobufUtil.execService(server, call, regionName);
- }
- };
- CoprocessorServiceResponse result = callable.withRetries();
- Message response = null;
- if (result.getValue().hasValue()) {
- response = responsePrototype.newBuilderForType()
- .mergeFrom(result.getValue().getValue()).build();
- } else {
- response = responsePrototype.getDefaultInstanceForType();
- }
- lastRegion = result.getRegion().getValue().toByteArray();
- if (LOG.isTraceEnabled()) {
- LOG.trace("Result is region=" + Bytes.toStringBinary(lastRegion) + ", value=" + response);
- }
- return response;
- }
-
- public byte[] getLastRegion() {
- return lastRegion;
- }
+ protected abstract Message callExecService(Descriptors.MethodDescriptor method,
+ Message request, Message responsePrototype) throws IOException;
}
Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java?rev=1409257&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java Wed Nov 14 17:03:06 2012
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.ipc;
+
+import com.google.protobuf.BlockingRpcChannel;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Message;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcChannel;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.ResponseConverter;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+
+import java.io.IOException;
+
+import static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
+
+/**
+ * Provides clients with an RPC connection to call coprocessor endpoint {@link com.google.protobuf.Service}s
+ * against the active master. An instance of this class may be obtained
+ * by calling {@link org.apache.hadoop.hbase.client.HBaseAdmin#coprocessorService()},
+ * but should normally only be used in creating a new {@link com.google.protobuf.Service} stub to call the endpoint
+ * methods.
+ * @see org.apache.hadoop.hbase.client.HBaseAdmin#coprocessorService()
+ */
+@InterfaceAudience.Private
+public class MasterCoprocessorRpcChannel extends CoprocessorRpcChannel{
+ private static Log LOG = LogFactory.getLog(MasterCoprocessorRpcChannel.class);
+
+ private final HConnection connection;
+
+ public MasterCoprocessorRpcChannel(HConnection conn) {
+ this.connection = conn;
+ }
+
+ @Override
+ protected Message callExecService(Descriptors.MethodDescriptor method,
+ Message request, Message responsePrototype)
+ throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Call: "+method.getName()+", "+request.toString());
+ }
+
+ final ClientProtos.CoprocessorServiceCall call =
+ ClientProtos.CoprocessorServiceCall.newBuilder()
+ .setRow(ByteString.copyFrom(HConstants.EMPTY_BYTE_ARRAY))
+ .setServiceName(method.getService().getFullName())
+ .setMethodName(method.getName())
+ .setRequest(request.toByteString()).build();
+ CoprocessorServiceResponse result = ProtobufUtil.execService(connection.getMasterAdmin(), call);
+ Message response = null;
+ if (result.getValue().hasValue()) {
+ response = responsePrototype.newBuilderForType()
+ .mergeFrom(result.getValue().getValue()).build();
+ } else {
+ response = responsePrototype.getDefaultInstanceForType();
+ }
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Master Result is value=" + response);
+ }
+ return response;
+ }
+
+}
Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java?rev=1409257&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java Wed Nov 14 17:03:06 2012
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.ipc;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Message;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.ServerCallable;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.io.IOException;
+
+import static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
+
+/**
+ * Provides clients with an RPC connection to call coprocessor endpoint {@link com.google.protobuf.Service}s
+ * against a given table region. An instance of this class may be obtained
+ * by calling {@link org.apache.hadoop.hbase.client.HTable#coprocessorService(byte[])},
+ * but should normally only be used in creating a new {@link com.google.protobuf.Service} stub to call the endpoint
+ * methods.
+ * @see org.apache.hadoop.hbase.client.HTable#coprocessorService(byte[])
+ */
+@InterfaceAudience.Private
+public class RegionCoprocessorRpcChannel extends CoprocessorRpcChannel{
+ private static Log LOG = LogFactory.getLog(RegionCoprocessorRpcChannel.class);
+
+ private final HConnection connection;
+ private final byte[] table;
+ private final byte[] row;
+ private byte[] lastRegion;
+
+ public RegionCoprocessorRpcChannel(HConnection conn, byte[] table, byte[] row) {
+ this.connection = conn;
+ this.table = table;
+ this.row = row;
+ }
+
+ @Override
+ protected Message callExecService(Descriptors.MethodDescriptor method,
+ Message request, Message responsePrototype)
+ throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Call: "+method.getName()+", "+request.toString());
+ }
+
+ if (row == null) {
+ throw new IllegalArgumentException("Missing row property for remote region location");
+ }
+
+ final ClientProtos.CoprocessorServiceCall call =
+ ClientProtos.CoprocessorServiceCall.newBuilder()
+ .setRow(ByteString.copyFrom(row))
+ .setServiceName(method.getService().getFullName())
+ .setMethodName(method.getName())
+ .setRequest(request.toByteString()).build();
+ ServerCallable<CoprocessorServiceResponse> callable =
+ new ServerCallable<CoprocessorServiceResponse>(connection, table, row) {
+ public CoprocessorServiceResponse call() throws Exception {
+ byte[] regionName = location.getRegionInfo().getRegionName();
+ return ProtobufUtil.execService(server, call, regionName);
+ }
+ };
+ CoprocessorServiceResponse result = callable.withRetries();
+ Message response = null;
+ if (result.getValue().hasValue()) {
+ response = responsePrototype.newBuilderForType()
+ .mergeFrom(result.getValue().getValue()).build();
+ } else {
+ response = responsePrototype.getDefaultInstanceForType();
+ }
+ lastRegion = result.getRegion().getValue().toByteArray();
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Result is region=" + Bytes.toStringBinary(lastRegion) + ", value=" + response);
+ }
+ return response;
+ }
+
+ public byte[] getLastRegion() {
+ return lastRegion;
+ }
+}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java?rev=1409257&r1=1409256&r2=1409257&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java Wed Nov 14 17:03:06 2012
@@ -40,6 +40,11 @@ import java.util.concurrent.atomic.Atomi
import javax.management.ObjectName;
+import com.google.common.collect.Maps;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Message;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.Service;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -82,6 +87,7 @@ import org.apache.hadoop.hbase.ipc.HBase
import org.apache.hadoop.hbase.ipc.HBaseServer;
import org.apache.hadoop.hbase.ipc.ProtocolSignature;
import org.apache.hadoop.hbase.ipc.RpcServer;
+import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.master.balancer.BalancerChore;
import org.apache.hadoop.hbase.master.balancer.ClusterStatusChore;
import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
@@ -101,7 +107,9 @@ import org.apache.hadoop.hbase.monitorin
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
@@ -159,6 +167,7 @@ import org.apache.hadoop.hbase.protobuf.
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorResponse;
+import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.replication.regionserver.Replication;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
@@ -309,6 +318,8 @@ Server {
private SpanReceiverHost spanReceiverHost;
+ private Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap();
+
/**
* Initializes the HMaster. The steps are as follows:
* <p>
@@ -2274,6 +2285,79 @@ Server {
return OfflineRegionResponse.newBuilder().build();
}
+ @Override
+ public boolean registerService(Service instance) {
+ /*
+ * No stacking of instances is allowed for a single service name
+ */
+ Descriptors.ServiceDescriptor serviceDesc = instance.getDescriptorForType();
+ if (coprocessorServiceHandlers.containsKey(serviceDesc.getFullName())) {
+ LOG.error("Coprocessor service "+serviceDesc.getFullName()+
+ " already registered, rejecting request from "+instance
+ );
+ return false;
+ }
+
+ coprocessorServiceHandlers.put(serviceDesc.getFullName(), instance);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Registered master coprocessor service: service="+serviceDesc.getFullName());
+ }
+ return true;
+ }
+
+ @Override
+ public ClientProtos.CoprocessorServiceResponse execMasterService(final RpcController controller,
+ final ClientProtos.CoprocessorServiceRequest request) throws ServiceException {
+ try {
+ ServerRpcController execController = new ServerRpcController();
+
+ ClientProtos.CoprocessorServiceCall call = request.getCall();
+ String serviceName = call.getServiceName();
+ String methodName = call.getMethodName();
+ if (!coprocessorServiceHandlers.containsKey(serviceName)) {
+ throw new HBaseRPC.UnknownProtocolException(null,
+ "No registered master coprocessor service found for name "+serviceName);
+ }
+
+ Service service = coprocessorServiceHandlers.get(serviceName);
+ Descriptors.ServiceDescriptor serviceDesc = service.getDescriptorForType();
+ Descriptors.MethodDescriptor methodDesc = serviceDesc.findMethodByName(methodName);
+ if (methodDesc == null) {
+ throw new HBaseRPC.UnknownProtocolException(service.getClass(),
+ "Unknown method "+methodName+" called on master service "+serviceName);
+ }
+
+ //invoke the method
+ Message execRequest = service.getRequestPrototype(methodDesc).newBuilderForType()
+ .mergeFrom(call.getRequest()).build();
+ final Message.Builder responseBuilder =
+ service.getResponsePrototype(methodDesc).newBuilderForType();
+ service.callMethod(methodDesc, controller, execRequest, new RpcCallback<Message>() {
+ @Override
+ public void run(Message message) {
+ if (message != null) {
+ responseBuilder.mergeFrom(message);
+ }
+ }
+ });
+ Message execResult = responseBuilder.build();
+
+ if (execController.getFailedOn() != null) {
+ throw execController.getFailedOn();
+ }
+ ClientProtos.CoprocessorServiceResponse.Builder builder =
+ ClientProtos.CoprocessorServiceResponse.newBuilder();
+ builder.setRegion(RequestConverter.buildRegionSpecifier(
+ RegionSpecifierType.REGION_NAME, HConstants.EMPTY_BYTE_ARRAY));
+ builder.setValue(
+ builder.getValueBuilder().setName(execResult.getClass().getName())
+ .setValue(execResult.toByteString()));
+ return builder.build();
+ } catch (IOException ie) {
+ throw new ServiceException(ie);
+ }
+ }
+
/**
* Utility for constructing an instance of the passed HMaster class.
* @param masterClass
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java?rev=1409257&r1=1409256&r2=1409257&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java Wed Nov 14 17:03:06 2012
@@ -70,6 +70,11 @@ public class MasterCoprocessorHost
public MasterEnvironment createEnvironment(final Class<?> implClass,
final Coprocessor instance, final int priority, final int seq,
final Configuration conf) {
+ for (Class c : implClass.getInterfaces()) {
+ if (CoprocessorService.class.isAssignableFrom(c)) {
+ masterServices.registerService(((CoprocessorService)instance).getService());
+ }
+ }
return new MasterEnvironment(implClass, instance, priority, seq, conf,
masterServices);
}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java?rev=1409257&r1=1409256&r2=1409257&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java Wed Nov 14 17:03:06 2012
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.master;
import java.io.IOException;
+import com.google.protobuf.Service;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.Server;
@@ -77,4 +78,23 @@ public interface MasterServices extends
* @return true if master enables ServerShutdownHandler;
*/
public boolean isServerShutdownHandlerEnabled();
+
+ /**
+ * Registers a new protocol buffer {@link Service} subclass as a master coprocessor endpoint to
+ * be available for handling
+ * {@link org.apache.hadoop.hbase.MasterAdminProtocol#execMasterService(com.google.protobuf.RpcController,
+ * org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest)} calls.
+ *
+ * <p>
+ * Only a single instance may be registered for a given {@link Service} subclass (the
+ * instances are keyed on {@link com.google.protobuf.Descriptors.ServiceDescriptor#getFullName()}.
+ * After the first registration, subsequent calls with the same service name will fail with
+ * a return value of {@code false}.
+ * </p>
+ * @param instance the {@code Service} subclass instance to expose as a coprocessor endpoint
+ * @return {@code true} if the registration was successful, {@code false}
+ * otherwise
+ */
+ public boolean registerService(Service instance);
+
}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java?rev=1409257&r1=1409256&r2=1409257&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java Wed Nov 14 17:03:06 2012
@@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.HConstant
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MasterAdminProtocol;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.Action;
import org.apache.hadoop.hbase.client.AdminProtocol;
@@ -1347,6 +1348,20 @@ public final class ProtobufUtil {
}
}
+ public static CoprocessorServiceResponse execService(final MasterAdminProtocol client,
+ final CoprocessorServiceCall call) throws IOException {
+ CoprocessorServiceRequest request = CoprocessorServiceRequest.newBuilder()
+ .setCall(call).setRegion(
+ RequestConverter.buildRegionSpecifier(REGION_NAME, HConstants.EMPTY_BYTE_ARRAY)).build();
+ try {
+ CoprocessorServiceResponse response =
+ client.execMasterService(null, request);
+ return response;
+ } catch (ServiceException se) {
+ throw getRemoteException(se);
+ }
+ }
+
@SuppressWarnings("unchecked")
public static <T extends Service> T newServiceStub(Class<T> service, RpcChannel channel)
throws Exception {
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/generated/MasterAdminProtos.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/generated/MasterAdminProtos.java?rev=1409257&r1=1409256&r2=1409257&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/generated/MasterAdminProtos.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/generated/MasterAdminProtos.java Wed Nov 14 17:03:06 2012
@@ -14464,6 +14464,11 @@ public final class MasterAdminProtos {
org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsCatalogJanitorEnabledRequest request,
com.google.protobuf.RpcCallback<org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsCatalogJanitorEnabledResponse> done);
+ public abstract void execMasterService(
+ com.google.protobuf.RpcController controller,
+ org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest request,
+ com.google.protobuf.RpcCallback<org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse> done);
+
}
public static com.google.protobuf.Service newReflectiveService(
@@ -14621,6 +14626,14 @@ public final class MasterAdminProtos {
impl.isCatalogJanitorEnabled(controller, request, done);
}
+ @java.lang.Override
+ public void execMasterService(
+ com.google.protobuf.RpcController controller,
+ org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest request,
+ com.google.protobuf.RpcCallback<org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse> done) {
+ impl.execMasterService(controller, request, done);
+ }
+
};
}
@@ -14681,6 +14694,8 @@ public final class MasterAdminProtos {
return impl.enableCatalogJanitor(controller, (org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.EnableCatalogJanitorRequest)request);
case 18:
return impl.isCatalogJanitorEnabled(controller, (org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsCatalogJanitorEnabledRequest)request);
+ case 19:
+ return impl.execMasterService(controller, (org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest)request);
default:
throw new java.lang.AssertionError("Can't get here.");
}
@@ -14733,6 +14748,8 @@ public final class MasterAdminProtos {
return org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.EnableCatalogJanitorRequest.getDefaultInstance();
case 18:
return org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsCatalogJanitorEnabledRequest.getDefaultInstance();
+ case 19:
+ return org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest.getDefaultInstance();
default:
throw new java.lang.AssertionError("Can't get here.");
}
@@ -14785,6 +14802,8 @@ public final class MasterAdminProtos {
return org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.EnableCatalogJanitorResponse.getDefaultInstance();
case 18:
return org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsCatalogJanitorEnabledResponse.getDefaultInstance();
+ case 19:
+ return org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse.getDefaultInstance();
default:
throw new java.lang.AssertionError("Can't get here.");
}
@@ -14888,6 +14907,11 @@ public final class MasterAdminProtos {
org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsCatalogJanitorEnabledRequest request,
com.google.protobuf.RpcCallback<org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsCatalogJanitorEnabledResponse> done);
+ public abstract void execMasterService(
+ com.google.protobuf.RpcController controller,
+ org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest request,
+ com.google.protobuf.RpcCallback<org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse> done);
+
public static final
com.google.protobuf.Descriptors.ServiceDescriptor
getDescriptor() {
@@ -15005,6 +15029,11 @@ public final class MasterAdminProtos {
com.google.protobuf.RpcUtil.<org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsCatalogJanitorEnabledResponse>specializeCallback(
done));
return;
+ case 19:
+ this.execMasterService(controller, (org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest)request,
+ com.google.protobuf.RpcUtil.<org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse>specializeCallback(
+ done));
+ return;
default:
throw new java.lang.AssertionError("Can't get here.");
}
@@ -15057,6 +15086,8 @@ public final class MasterAdminProtos {
return org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.EnableCatalogJanitorRequest.getDefaultInstance();
case 18:
return org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsCatalogJanitorEnabledRequest.getDefaultInstance();
+ case 19:
+ return org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest.getDefaultInstance();
default:
throw new java.lang.AssertionError("Can't get here.");
}
@@ -15109,6 +15140,8 @@ public final class MasterAdminProtos {
return org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.EnableCatalogJanitorResponse.getDefaultInstance();
case 18:
return org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsCatalogJanitorEnabledResponse.getDefaultInstance();
+ case 19:
+ return org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse.getDefaultInstance();
default:
throw new java.lang.AssertionError("Can't get here.");
}
@@ -15414,6 +15447,21 @@ public final class MasterAdminProtos {
org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsCatalogJanitorEnabledResponse.class,
org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsCatalogJanitorEnabledResponse.getDefaultInstance()));
}
+
+ public void execMasterService(
+ com.google.protobuf.RpcController controller,
+ org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest request,
+ com.google.protobuf.RpcCallback<org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse> done) {
+ channel.callMethod(
+ getDescriptor().getMethods().get(19),
+ controller,
+ request,
+ org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse.getDefaultInstance(),
+ com.google.protobuf.RpcUtil.generalizeCallback(
+ done,
+ org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse.class,
+ org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse.getDefaultInstance()));
+ }
}
public static BlockingInterface newBlockingStub(
@@ -15516,6 +15564,11 @@ public final class MasterAdminProtos {
com.google.protobuf.RpcController controller,
org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsCatalogJanitorEnabledRequest request)
throws com.google.protobuf.ServiceException;
+
+ public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse execMasterService(
+ com.google.protobuf.RpcController controller,
+ org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest request)
+ throws com.google.protobuf.ServiceException;
}
private static final class BlockingStub implements BlockingInterface {
@@ -15752,6 +15805,18 @@ public final class MasterAdminProtos {
org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsCatalogJanitorEnabledResponse.getDefaultInstance());
}
+
+ public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse execMasterService(
+ com.google.protobuf.RpcController controller,
+ org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest request)
+ throws com.google.protobuf.ServiceException {
+ return (org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse) channel.callBlockingMethod(
+ getDescriptor().getMethods().get(19),
+ controller,
+ request,
+ org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse.getDefaultInstance());
+ }
+
}
}
@@ -15954,79 +16019,81 @@ public final class MasterAdminProtos {
descriptor;
static {
java.lang.String[] descriptorData = {
- "\n\021MasterAdmin.proto\032\013hbase.proto\"R\n\020AddC" +
- "olumnRequest\022\021\n\ttableName\030\001 \002(\014\022+\n\016colum" +
- "nFamilies\030\002 \002(\0132\023.ColumnFamilySchema\"\023\n\021" +
- "AddColumnResponse\"<\n\023DeleteColumnRequest" +
- "\022\021\n\ttableName\030\001 \002(\014\022\022\n\ncolumnName\030\002 \002(\014\"" +
- "\026\n\024DeleteColumnResponse\"U\n\023ModifyColumnR" +
- "equest\022\021\n\ttableName\030\001 \002(\014\022+\n\016columnFamil" +
- "ies\030\002 \002(\0132\023.ColumnFamilySchema\"\026\n\024Modify" +
- "ColumnResponse\"Z\n\021MoveRegionRequest\022 \n\006r" +
- "egion\030\001 \002(\0132\020.RegionSpecifier\022#\n\016destSer",
- "verName\030\002 \001(\0132\013.ServerName\"\024\n\022MoveRegion" +
- "Response\"7\n\023AssignRegionRequest\022 \n\006regio" +
- "n\030\001 \002(\0132\020.RegionSpecifier\"\026\n\024AssignRegio" +
- "nResponse\"O\n\025UnassignRegionRequest\022 \n\006re" +
- "gion\030\001 \002(\0132\020.RegionSpecifier\022\024\n\005force\030\002 " +
- "\001(\010:\005false\"\030\n\026UnassignRegionResponse\"8\n\024" +
- "OfflineRegionRequest\022 \n\006region\030\001 \002(\0132\020.R" +
- "egionSpecifier\"\027\n\025OfflineRegionResponse\"" +
- "J\n\022CreateTableRequest\022!\n\013tableSchema\030\001 \002" +
- "(\0132\014.TableSchema\022\021\n\tsplitKeys\030\002 \003(\014\"\025\n\023C",
- "reateTableResponse\"\'\n\022DeleteTableRequest" +
- "\022\021\n\ttableName\030\001 \002(\014\"\025\n\023DeleteTableRespon" +
- "se\"\'\n\022EnableTableRequest\022\021\n\ttableName\030\001 " +
- "\002(\014\"\025\n\023EnableTableResponse\"(\n\023DisableTab" +
- "leRequest\022\021\n\ttableName\030\001 \002(\014\"\026\n\024DisableT" +
- "ableResponse\"J\n\022ModifyTableRequest\022\021\n\tta" +
- "bleName\030\001 \002(\014\022!\n\013tableSchema\030\002 \002(\0132\014.Tab" +
- "leSchema\"\025\n\023ModifyTableResponse\"\021\n\017Shutd" +
- "ownRequest\"\022\n\020ShutdownResponse\"\023\n\021StopMa" +
- "sterRequest\"\024\n\022StopMasterResponse\"\020\n\016Bal",
- "anceRequest\"&\n\017BalanceResponse\022\023\n\013balanc" +
- "erRan\030\001 \002(\010\"<\n\031SetBalancerRunningRequest" +
- "\022\n\n\002on\030\001 \002(\010\022\023\n\013synchronous\030\002 \001(\010\"6\n\032Set" +
- "BalancerRunningResponse\022\030\n\020prevBalanceVa" +
- "lue\030\001 \001(\010\"\024\n\022CatalogScanRequest\")\n\023Catal" +
- "ogScanResponse\022\022\n\nscanResult\030\001 \001(\005\"-\n\033En" +
- "ableCatalogJanitorRequest\022\016\n\006enable\030\001 \002(" +
- "\010\"1\n\034EnableCatalogJanitorResponse\022\021\n\tpre" +
- "vValue\030\001 \001(\010\" \n\036IsCatalogJanitorEnabledR" +
- "equest\"0\n\037IsCatalogJanitorEnabledRespons",
- "e\022\r\n\005value\030\001 \002(\0102\263\t\n\022MasterAdminService\022" +
- "2\n\taddColumn\022\021.AddColumnRequest\032\022.AddCol" +
- "umnResponse\022;\n\014deleteColumn\022\024.DeleteColu" +
- "mnRequest\032\025.DeleteColumnResponse\022;\n\014modi" +
- "fyColumn\022\024.ModifyColumnRequest\032\025.ModifyC" +
- "olumnResponse\0225\n\nmoveRegion\022\022.MoveRegion" +
- "Request\032\023.MoveRegionResponse\022;\n\014assignRe" +
- "gion\022\024.AssignRegionRequest\032\025.AssignRegio" +
- "nResponse\022A\n\016unassignRegion\022\026.UnassignRe" +
- "gionRequest\032\027.UnassignRegionResponse\022>\n\r",
- "offlineRegion\022\025.OfflineRegionRequest\032\026.O" +
- "fflineRegionResponse\0228\n\013deleteTable\022\023.De" +
- "leteTableRequest\032\024.DeleteTableResponse\0228" +
- "\n\013enableTable\022\023.EnableTableRequest\032\024.Ena" +
- "bleTableResponse\022;\n\014disableTable\022\024.Disab" +
- "leTableRequest\032\025.DisableTableResponse\0228\n" +
- "\013modifyTable\022\023.ModifyTableRequest\032\024.Modi" +
- "fyTableResponse\0228\n\013createTable\022\023.CreateT" +
- "ableRequest\032\024.CreateTableResponse\022/\n\010shu" +
- "tdown\022\020.ShutdownRequest\032\021.ShutdownRespon",
- "se\0225\n\nstopMaster\022\022.StopMasterRequest\032\023.S" +
- "topMasterResponse\022,\n\007balance\022\017.BalanceRe" +
- "quest\032\020.BalanceResponse\022M\n\022setBalancerRu" +
- "nning\022\032.SetBalancerRunningRequest\032\033.SetB" +
- "alancerRunningResponse\022;\n\016runCatalogScan" +
- "\022\023.CatalogScanRequest\032\024.CatalogScanRespo" +
- "nse\022S\n\024enableCatalogJanitor\022\034.EnableCata" +
- "logJanitorRequest\032\035.EnableCatalogJanitor" +
- "Response\022\\\n\027isCatalogJanitorEnabled\022\037.Is" +
- "CatalogJanitorEnabledRequest\032 .IsCatalog",
- "JanitorEnabledResponseBG\n*org.apache.had" +
- "oop.hbase.protobuf.generatedB\021MasterAdmi" +
- "nProtosH\001\210\001\001\240\001\001"
+ "\n\021MasterAdmin.proto\032\013hbase.proto\032\014Client" +
+ ".proto\"R\n\020AddColumnRequest\022\021\n\ttableName\030" +
+ "\001 \002(\014\022+\n\016columnFamilies\030\002 \002(\0132\023.ColumnFa" +
+ "milySchema\"\023\n\021AddColumnResponse\"<\n\023Delet" +
+ "eColumnRequest\022\021\n\ttableName\030\001 \002(\014\022\022\n\ncol" +
+ "umnName\030\002 \002(\014\"\026\n\024DeleteColumnResponse\"U\n" +
+ "\023ModifyColumnRequest\022\021\n\ttableName\030\001 \002(\014\022" +
+ "+\n\016columnFamilies\030\002 \002(\0132\023.ColumnFamilySc" +
+ "hema\"\026\n\024ModifyColumnResponse\"Z\n\021MoveRegi" +
+ "onRequest\022 \n\006region\030\001 \002(\0132\020.RegionSpecif",
+ "ier\022#\n\016destServerName\030\002 \001(\0132\013.ServerName" +
+ "\"\024\n\022MoveRegionResponse\"7\n\023AssignRegionRe" +
+ "quest\022 \n\006region\030\001 \002(\0132\020.RegionSpecifier\"" +
+ "\026\n\024AssignRegionResponse\"O\n\025UnassignRegio" +
+ "nRequest\022 \n\006region\030\001 \002(\0132\020.RegionSpecifi" +
+ "er\022\024\n\005force\030\002 \001(\010:\005false\"\030\n\026UnassignRegi" +
+ "onResponse\"8\n\024OfflineRegionRequest\022 \n\006re" +
+ "gion\030\001 \002(\0132\020.RegionSpecifier\"\027\n\025OfflineR" +
+ "egionResponse\"J\n\022CreateTableRequest\022!\n\013t" +
+ "ableSchema\030\001 \002(\0132\014.TableSchema\022\021\n\tsplitK",
+ "eys\030\002 \003(\014\"\025\n\023CreateTableResponse\"\'\n\022Dele" +
+ "teTableRequest\022\021\n\ttableName\030\001 \002(\014\"\025\n\023Del" +
+ "eteTableResponse\"\'\n\022EnableTableRequest\022\021" +
+ "\n\ttableName\030\001 \002(\014\"\025\n\023EnableTableResponse" +
+ "\"(\n\023DisableTableRequest\022\021\n\ttableName\030\001 \002" +
+ "(\014\"\026\n\024DisableTableResponse\"J\n\022ModifyTabl" +
+ "eRequest\022\021\n\ttableName\030\001 \002(\014\022!\n\013tableSche" +
+ "ma\030\002 \002(\0132\014.TableSchema\"\025\n\023ModifyTableRes" +
+ "ponse\"\021\n\017ShutdownRequest\"\022\n\020ShutdownResp" +
+ "onse\"\023\n\021StopMasterRequest\"\024\n\022StopMasterR",
+ "esponse\"\020\n\016BalanceRequest\"&\n\017BalanceResp" +
+ "onse\022\023\n\013balancerRan\030\001 \002(\010\"<\n\031SetBalancer" +
+ "RunningRequest\022\n\n\002on\030\001 \002(\010\022\023\n\013synchronou" +
+ "s\030\002 \001(\010\"6\n\032SetBalancerRunningResponse\022\030\n" +
+ "\020prevBalanceValue\030\001 \001(\010\"\024\n\022CatalogScanRe" +
+ "quest\")\n\023CatalogScanResponse\022\022\n\nscanResu" +
+ "lt\030\001 \001(\005\"-\n\033EnableCatalogJanitorRequest\022" +
+ "\016\n\006enable\030\001 \002(\010\"1\n\034EnableCatalogJanitorR" +
+ "esponse\022\021\n\tprevValue\030\001 \001(\010\" \n\036IsCatalogJ" +
+ "anitorEnabledRequest\"0\n\037IsCatalogJanitor",
+ "EnabledResponse\022\r\n\005value\030\001 \002(\0102\201\n\n\022Maste" +
+ "rAdminService\0222\n\taddColumn\022\021.AddColumnRe" +
+ "quest\032\022.AddColumnResponse\022;\n\014deleteColum" +
+ "n\022\024.DeleteColumnRequest\032\025.DeleteColumnRe" +
+ "sponse\022;\n\014modifyColumn\022\024.ModifyColumnReq" +
+ "uest\032\025.ModifyColumnResponse\0225\n\nmoveRegio" +
+ "n\022\022.MoveRegionRequest\032\023.MoveRegionRespon" +
+ "se\022;\n\014assignRegion\022\024.AssignRegionRequest" +
+ "\032\025.AssignRegionResponse\022A\n\016unassignRegio" +
+ "n\022\026.UnassignRegionRequest\032\027.UnassignRegi",
+ "onResponse\022>\n\rofflineRegion\022\025.OfflineReg" +
+ "ionRequest\032\026.OfflineRegionResponse\0228\n\013de" +
+ "leteTable\022\023.DeleteTableRequest\032\024.DeleteT" +
+ "ableResponse\0228\n\013enableTable\022\023.EnableTabl" +
+ "eRequest\032\024.EnableTableResponse\022;\n\014disabl" +
+ "eTable\022\024.DisableTableRequest\032\025.DisableTa" +
+ "bleResponse\0228\n\013modifyTable\022\023.ModifyTable" +
+ "Request\032\024.ModifyTableResponse\0228\n\013createT" +
+ "able\022\023.CreateTableRequest\032\024.CreateTableR" +
+ "esponse\022/\n\010shutdown\022\020.ShutdownRequest\032\021.",
+ "ShutdownResponse\0225\n\nstopMaster\022\022.StopMas" +
+ "terRequest\032\023.StopMasterResponse\022,\n\007balan" +
+ "ce\022\017.BalanceRequest\032\020.BalanceResponse\022M\n" +
+ "\022setBalancerRunning\022\032.SetBalancerRunning" +
+ "Request\032\033.SetBalancerRunningResponse\022;\n\016" +
+ "runCatalogScan\022\023.CatalogScanRequest\032\024.Ca" +
+ "talogScanResponse\022S\n\024enableCatalogJanito" +
+ "r\022\034.EnableCatalogJanitorRequest\032\035.Enable" +
+ "CatalogJanitorResponse\022\\\n\027isCatalogJanit" +
+ "orEnabled\022\037.IsCatalogJanitorEnabledReque",
+ "st\032 .IsCatalogJanitorEnabledResponse\022L\n\021" +
+ "execMasterService\022\032.CoprocessorServiceRe" +
+ "quest\032\033.CoprocessorServiceResponseBG\n*or" +
+ "g.apache.hadoop.hbase.protobuf.generated" +
+ "B\021MasterAdminProtosH\001\210\001\001\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -16344,6 +16411,7 @@ public final class MasterAdminProtos {
.internalBuildGeneratedFileFrom(descriptorData,
new com.google.protobuf.Descriptors.FileDescriptor[] {
org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.getDescriptor(),
+ org.apache.hadoop.hbase.protobuf.generated.ClientProtos.getDescriptor(),
}, assigner);
}
Modified: hbase/trunk/hbase-server/src/main/protobuf/MasterAdmin.proto
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/protobuf/MasterAdmin.proto?rev=1409257&r1=1409256&r2=1409257&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/protobuf/MasterAdmin.proto (original)
+++ hbase/trunk/hbase-server/src/main/protobuf/MasterAdmin.proto Wed Nov 14 17:03:06 2012
@@ -25,6 +25,7 @@ option java_generate_equals_and_hash = t
option optimize_for = SPEED;
import "hbase.proto";
+import "Client.proto";
/* Column-level protobufs */
@@ -273,4 +274,10 @@ service MasterAdminService {
*/
rpc isCatalogJanitorEnabled(IsCatalogJanitorEnabledRequest)
returns(IsCatalogJanitorEnabledResponse);
+
+ /**
+ * Call a master coprocessor endpoint
+ */
+ rpc execMasterService(CoprocessorServiceRequest)
+ returns(CoprocessorServiceResponse);
}
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorEndpoint.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorEndpoint.java?rev=1409257&r1=1409256&r2=1409257&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorEndpoint.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorEndpoint.java Wed Nov 14 17:03:06 2012
@@ -45,6 +45,7 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
/**
* TestEndpoint: test cases to verify coprocessor Endpoint
@@ -76,6 +77,8 @@ public class TestCoprocessorEndpoint {
org.apache.hadoop.hbase.coprocessor.ColumnAggregationEndpoint.class.getName(),
org.apache.hadoop.hbase.coprocessor.GenericEndpoint.class.getName(),
ProtobufCoprocessorService.class.getName());
+ conf.setStrings(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
+ ProtobufCoprocessorService.class.getName());
util.startMiniCluster(2);
HBaseAdmin admin = new HBaseAdmin(conf);
HTableDescriptor desc = new HTableDescriptor(TEST_TABLE);
@@ -253,6 +256,17 @@ public class TestCoprocessorEndpoint {
}
}
+ @Test
+ public void testMasterCoprocessorService() throws Throwable {
+ HBaseAdmin admin = util.getHBaseAdmin();
+ final TestProtos.EchoRequestProto request =
+ TestProtos.EchoRequestProto.newBuilder().setMessage("hello").build();
+ TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface service =
+ TestRpcServiceProtos.TestProtobufRpcProto.newBlockingStub(admin.coprocessorService());
+ assertEquals("hello", service.echo(null, request).getMessage());
+ admin.close();
+ }
+
private static byte[][] makeN(byte[] base, int n) {
byte[][] ret = new byte[n][];
for (int i = 0; i < n; i++) {
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java?rev=1409257&r1=1409256&r2=1409257&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java Wed Nov 14 17:03:06 2012
@@ -30,6 +30,7 @@ import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
+import com.google.protobuf.Service;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -288,6 +289,11 @@ public class TestCatalogJanitor {
public boolean isServerShutdownHandlerEnabled() {
return true;
}
+
+ @Override
+ public boolean registerService(Service instance) {
+ return false;
+ }
}
@Test