You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2011/10/28 23:55:06 UTC
svn commit: r1190611 [1/3] - in
/hadoop/common/trunk/hadoop-common-project/hadoop-common: ./ dev-support/
src/main/java/org/apache/hadoop/ipc/
src/main/java/org/apache/hadoop/ipc/protobuf/ src/proto/
src/test/java/org/apache/hadoop/ipc/ src/test/java/o...
Author: suresh
Date: Fri Oct 28 21:55:05 2011
New Revision: 1190611
URL: http://svn.apache.org/viewvc?rev=1190611&view=rev
Log:
HADOOP-7773. Add support for protocol buffer based RPC engine. Contributed by Suresh Srinivas.
Added:
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/protobuf/
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/protobuf/HadoopRpcProtos.java
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/proto/
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/proto/hadoop_rpc.proto
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/protobuf/
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/protobuf/TestProtos.java
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/protobuf/TestRpcServiceProtos.java
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/proto/
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/proto/test.proto
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/proto/test_rpc_service.proto
Modified:
hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt
hadoop/common/trunk/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcServerException.java
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1190611&r1=1190610&r2=1190611&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt Fri Oct 28 21:55:05 2011
@@ -89,6 +89,9 @@ Trunk (unreleased changes)
HADOOP-7761. Improve the performance of raw comparisons. (todd)
+ HADOOP-7773. Add support for protocol buffer based RPC engine.
+ (suresh)
+
Release 0.23.0 - Unreleased
INCOMPATIBLE CHANGES
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml?rev=1190611&r1=1190610&r2=1190611&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml Fri Oct 28 21:55:05 2011
@@ -270,4 +270,8 @@
<!-- backward compatibility -->
<Bug pattern="NM_SAME_SIMPLE_NAME_AS_SUPERCLASS"/>
</Match>
+ <Match>
+ <!-- protobuf generated code -->
+ <Class name="org.apache.hadoop.ipc.protobuf.HadoopRpcProtos"/>
+ </Match>
</FindBugsFilter>
Added: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java?rev=1190611&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java Fri Oct 28 21:55:05 2011
@@ -0,0 +1,389 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ipc;
+
+import java.io.Closeable;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.net.SocketFactory;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcExceptionProto;
+import org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcRequestProto;
+import org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcResponseProto;
+import org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcResponseProto.ResponseStatus;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.SecretManager;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.StringUtils;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.BlockingService;
+import com.google.protobuf.Descriptors.MethodDescriptor;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.Message;
+import com.google.protobuf.ServiceException;
+
+/**
+ * RPC Engine for for protobuf based RPCs.
+ */
+@InterfaceStability.Evolving
+public class ProtobufRpcEngine implements RpcEngine {
+ private static final Log LOG = LogFactory.getLog(ProtobufRpcEngine.class);
+
+ private static final ClientCache CLIENTS = new ClientCache();
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
+ InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
+ SocketFactory factory, int rpcTimeout) throws IOException {
+
+ return new ProtocolProxy<T>(protocol, (T) Proxy.newProxyInstance(protocol
+ .getClassLoader(), new Class[] { protocol }, new Invoker(protocol,
+ addr, ticket, conf, factory, rpcTimeout)), false);
+ }
+
+ private static class Invoker implements InvocationHandler, Closeable {
+ private Map<String, Message> returnTypes = new ConcurrentHashMap<String, Message>();
+ private boolean isClosed = false;
+ private Client.ConnectionId remoteId;
+ private Client client;
+
+ public Invoker(Class<?> protocol, InetSocketAddress addr,
+ UserGroupInformation ticket, Configuration conf, SocketFactory factory,
+ int rpcTimeout) throws IOException {
+ this.remoteId = Client.ConnectionId.getConnectionId(addr, protocol,
+ ticket, rpcTimeout, conf);
+ this.client = CLIENTS.getClient(conf, factory,
+ RpcResponseWritable.class);
+ }
+
+ private HadoopRpcRequestProto constructRpcRequest(Method method,
+ Object[] params) throws ServiceException {
+ HadoopRpcRequestProto rpcRequest;
+ HadoopRpcRequestProto.Builder builder = HadoopRpcRequestProto
+ .newBuilder();
+ builder.setMethodName(method.getName());
+
+ if (params.length != 2) { // RpcController + Message
+ throw new ServiceException("Too many parameters for request. Method: ["
+ + method.getName() + "]" + ", Expected: 2, Actual: "
+ + params.length);
+ }
+ if (params[1] == null) {
+ throw new ServiceException("null param while calling Method: ["
+ + method.getName() + "]");
+ }
+
+ Message param = (Message) params[1];
+ builder.setRequest(param.toByteString());
+ rpcRequest = builder.build();
+ return rpcRequest;
+ }
+
+ /**
+ * This is the client side invoker of RPC method. It only throws
+ * ServiceException, since the invocation proxy expects only
+ * ServiceException to be thrown by the method in case protobuf service.
+ *
+ * ServiceException has the following causes:
+ * <ol>
+ * <li>Exceptions encountered in this methods are thrown as
+ * RpcClientException, wrapped in RemoteException</li>
+ * <li>Remote exceptions are thrown wrapped in RemoteException</li>
+ * </ol>
+ *
+ * Note that the client calling protobuf RPC methods, must handle
+ * ServiceException by getting the cause from the ServiceException. If the
+ * cause is RemoteException, then unwrap it to get the exception thrown by
+ * the server.
+ */
+ @Override
+ public Object invoke(Object proxy, Method method, Object[] args)
+ throws ServiceException {
+ long startTime = 0;
+ if (LOG.isDebugEnabled()) {
+ startTime = System.currentTimeMillis();
+ }
+
+ HadoopRpcRequestProto rpcRequest = constructRpcRequest(method, args);
+ RpcResponseWritable val = null;
+ try {
+ val = (RpcResponseWritable) client.call(
+ new RpcRequestWritable(rpcRequest), remoteId);
+ } catch (Exception e) {
+ RpcClientException ce = new RpcClientException("Client exception", e);
+ throw new ServiceException(getRemoteException(ce));
+ }
+
+ HadoopRpcResponseProto response = val.message;
+ if (LOG.isDebugEnabled()) {
+ long callTime = System.currentTimeMillis() - startTime;
+ LOG.debug("Call: " + method.getName() + " " + callTime);
+ }
+
+ // Wrap the received message
+ ResponseStatus status = response.getStatus();
+ if (status != ResponseStatus.SUCCESS) {
+ RemoteException re = new RemoteException(response.getException()
+ .getExceptionName(), response.getException().getStackTrace());
+ re.fillInStackTrace();
+ throw new ServiceException(re);
+ }
+
+ Message prototype = null;
+ try {
+ prototype = getReturnProtoType(method);
+ } catch (Exception e) {
+ throw new ServiceException(e);
+ }
+ Message returnMessage;
+ try {
+ returnMessage = prototype.newBuilderForType()
+ .mergeFrom(response.getResponse()).build();
+ } catch (InvalidProtocolBufferException e) {
+ RpcClientException ce = new RpcClientException("Client exception", e);
+ throw new ServiceException(getRemoteException(ce));
+ }
+ return returnMessage;
+ }
+
+ public void close() throws IOException {
+ if (!isClosed) {
+ isClosed = true;
+ CLIENTS.stopClient(client);
+ }
+ }
+
+ private Message getReturnProtoType(Method method) throws Exception {
+ if (returnTypes.containsKey(method.getName())) {
+ return returnTypes.get(method.getName());
+ }
+
+ Class<?> returnType = method.getReturnType();
+ Method newInstMethod = returnType.getMethod("getDefaultInstance");
+ newInstMethod.setAccessible(true);
+ Message prototype = (Message) newInstMethod.invoke(null, (Object[]) null);
+ returnTypes.put(method.getName(), prototype);
+ return prototype;
+ }
+ }
+
+ @Override
+ public Object[] call(Method method, Object[][] params,
+ InetSocketAddress[] addrs, UserGroupInformation ticket, Configuration conf) {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Writable Wrapper for Protocol Buffer Requests
+ */
+ private static class RpcRequestWritable implements Writable {
+ HadoopRpcRequestProto message;
+
+ @SuppressWarnings("unused")
+ public RpcRequestWritable() {
+ }
+
+ RpcRequestWritable(HadoopRpcRequestProto message) {
+ this.message = message;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(message.toByteArray().length);
+ out.write(message.toByteArray());
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ int length = in.readInt();
+ byte[] bytes = new byte[length];
+ in.readFully(bytes);
+ message = HadoopRpcRequestProto.parseFrom(bytes);
+ }
+ }
+
+ /**
+ * Writable Wrapper for Protocol Buffer Responses
+ */
+ private static class RpcResponseWritable implements Writable {
+ HadoopRpcResponseProto message;
+
+ @SuppressWarnings("unused")
+ public RpcResponseWritable() {
+ }
+
+ public RpcResponseWritable(HadoopRpcResponseProto message) {
+ this.message = message;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(message.toByteArray().length);
+ out.write(message.toByteArray());
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ int length = in.readInt();
+ byte[] bytes = new byte[length];
+ in.readFully(bytes);
+ message = HadoopRpcResponseProto.parseFrom(bytes);
+ }
+ }
+
+ @VisibleForTesting
+ @InterfaceAudience.Private
+ @InterfaceStability.Unstable
+ static Client getClient(Configuration conf) {
+ return CLIENTS.getClient(conf, SocketFactory.getDefault(),
+ RpcResponseWritable.class);
+ }
+
+
+ @Override
+ public RPC.Server getServer(Class<?> protocol, Object instance,
+ String bindAddress, int port, int numHandlers, int numReaders,
+ int queueSizePerHandler, boolean verbose, Configuration conf,
+ SecretManager<? extends TokenIdentifier> secretManager)
+ throws IOException {
+ return new Server(instance, conf, bindAddress, port, numHandlers,
+ numReaders, queueSizePerHandler, verbose, secretManager);
+ }
+
+ private static RemoteException getRemoteException(Exception e) {
+ return new RemoteException(e.getClass().getName(),
+ StringUtils.stringifyException(e));
+ }
+
+ public static class Server extends RPC.Server {
+ private BlockingService service;
+ private boolean verbose;
+
+ private static String classNameBase(String className) {
+ String[] names = className.split("\\.", -1);
+ if (names == null || names.length == 0) {
+ return className;
+ }
+ return names[names.length - 1];
+ }
+
+ /**
+ * Construct an RPC server.
+ *
+ * @param instance the instance whose methods will be called
+ * @param conf the configuration to use
+ * @param bindAddress the address to bind on to listen for connection
+ * @param port the port to listen for connections on
+ * @param numHandlers the number of method handler threads to run
+ * @param verbose whether each call should be logged
+ */
+ public Server(Object instance, Configuration conf, String bindAddress,
+ int port, int numHandlers, int numReaders, int queueSizePerHandler,
+ boolean verbose, SecretManager<? extends TokenIdentifier> secretManager)
+ throws IOException {
+ super(bindAddress, port, RpcRequestWritable.class, numHandlers,
+ numReaders, queueSizePerHandler, conf, classNameBase(instance
+ .getClass().getName()), secretManager);
+ this.service = (BlockingService) instance;
+ this.verbose = verbose;
+ }
+
+ /**
+ * This is a server side method, which is invoked over RPC. On success
+ * the return response has protobuf response payload. On failure, the
+ * exception name and the stack trace are return in the resposne. See {@link HadoopRpcResponseProto}
+ *
+ * In this method there three types of exceptions possible and they are
+ * returned in response as follows.
+ * <ol>
+ * <li> Exceptions encountered in this method that are returned as {@link RpcServerException} </li>
+ * <li> Exceptions thrown by the service is wrapped in ServiceException. In that
+ * this method returns in response the exception thrown by the service.</li>
+ * <li> Other exceptions thrown by the service. They are returned as
+ * it is.</li>
+ * </ol>
+ */
+ @Override
+ public Writable call(String protocol, Writable writableRequest,
+ long receiveTime) throws IOException {
+ RpcRequestWritable request = (RpcRequestWritable) writableRequest;
+ HadoopRpcRequestProto rpcRequest = request.message;
+ String methodName = rpcRequest.getMethodName();
+ if (verbose)
+ LOG.info("Call: protocol=" + protocol + ", method=" + methodName);
+ MethodDescriptor methodDescriptor = service.getDescriptorForType()
+ .findMethodByName(methodName);
+ if (methodDescriptor == null) {
+ String msg = "Unknown method " + methodName + " called on " + protocol
+ + " protocol.";
+ LOG.warn(msg);
+ return handleException(new RpcServerException(msg));
+ }
+ Message prototype = service.getRequestPrototype(methodDescriptor);
+ Message param = prototype.newBuilderForType()
+ .mergeFrom(rpcRequest.getRequest()).build();
+ Message result;
+ try {
+ result = service.callBlockingMethod(methodDescriptor, null, param);
+ } catch (ServiceException e) {
+ Throwable cause = e.getCause();
+ return handleException(cause != null ? cause : e);
+ } catch (Exception e) {
+ return handleException(e);
+ }
+
+ HadoopRpcResponseProto response = constructProtoSpecificRpcSuccessResponse(result);
+ return new RpcResponseWritable(response);
+ }
+
+ private RpcResponseWritable handleException(Throwable e) {
+ HadoopRpcExceptionProto exception = HadoopRpcExceptionProto.newBuilder()
+ .setExceptionName(e.getClass().getName())
+ .setStackTrace(StringUtils.stringifyException(e)).build();
+ HadoopRpcResponseProto response = HadoopRpcResponseProto.newBuilder()
+ .setStatus(ResponseStatus.ERRROR).setException(exception).build();
+ return new RpcResponseWritable(response);
+ }
+
+ private HadoopRpcResponseProto constructProtoSpecificRpcSuccessResponse(
+ Message message) {
+ HadoopRpcResponseProto res = HadoopRpcResponseProto.newBuilder()
+ .setResponse(message.toByteString())
+ .setStatus(ResponseStatus.SUCCESS)
+ .build();
+ return res;
+ }
+ }
+}
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcServerException.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcServerException.java?rev=1190611&r1=1190610&r2=1190611&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcServerException.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcServerException.java Fri Oct 28 21:55:05 2011
@@ -25,10 +25,9 @@ public class RpcServerException extends
/**
* Constructs exception with the specified detail message.
- *
- * @param messages detailed message.
+ * @param message detailed message.
*/
- RpcServerException(final String message) {
+ public RpcServerException(final String message) {
super(message);
}
@@ -36,12 +35,11 @@ public class RpcServerException extends
* Constructs exception with the specified detail message and cause.
*
* @param message message.
- * @param cause that cause this exception
* @param cause the cause (can be retried by the {@link #getCause()} method).
* (A <tt>null</tt> value is permitted, and indicates that the cause
* is nonexistent or unknown.)
*/
- RpcServerException(final String message, final Throwable cause) {
+ public RpcServerException(final String message, final Throwable cause) {
super(message, cause);
}
}