You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sz...@apache.org on 2012/03/23 19:27:18 UTC
svn commit: r1304546 - in
/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common: ./
src/main/java/ src/main/java/org/apache/hadoop/ipc/ src/main/proto/
Author: szetszwo
Date: Fri Mar 23 18:27:17 2012
New Revision: 1304546
URL: http://svn.apache.org/viewvc?rev=1304546&view=rev
Log:
svn merge -c 1304542 from trunk for HADOOP-8184.
Modified:
hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/ (props changed)
hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/CHANGES.txt (contents, props changed)
hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/ (props changed)
hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/proto/hadoop_rpc.proto
Propchange: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-common-project/hadoop-common:r1304542
Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1304546&r1=1304545&r2=1304546&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/CHANGES.txt Fri Mar 23 18:27:17 2012
@@ -100,6 +100,9 @@ Release 0.23.3 - UNRELEASED
HADOOP-8200. Remove HADOOP_[JOBTRACKER|TASKTRACKER]_OPTS. (eli)
+ HADOOP-8184. ProtoBuf RPC engine uses the IPC layer reply packet.
+ (Sanjay Radia via szetszwo)
+
OPTIMIZATIONS
BUG FIXES
Propchange: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/CHANGES.txt
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt:r1304542
Propchange: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java:r1304542
Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java?rev=1304546&r1=1304545&r2=1304546&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java Fri Mar 23 18:27:17 2012
@@ -39,15 +39,12 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.ipc.Client.ConnectionId;
import org.apache.hadoop.ipc.RPC.RpcInvoker;
import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
-import org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcExceptionProto;
+
import org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcRequestProto;
-import org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcResponseProto;
-import org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcResponseProto.ResponseStatus;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.ProtoUtil;
-import org.apache.hadoop.util.StringUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.BlockingService;
@@ -191,21 +188,11 @@ public class ProtobufRpcEngine implement
throw new ServiceException(e);
}
- HadoopRpcResponseProto response = val.message;
if (LOG.isDebugEnabled()) {
long callTime = System.currentTimeMillis() - startTime;
LOG.debug("Call: " + method.getName() + " " + callTime);
}
-
- // Wrap the received message
- ResponseStatus status = response.getStatus();
- if (status != ResponseStatus.SUCCESS) {
- RemoteException re = new RemoteException(response.getException()
- .getExceptionName(), response.getException().getStackTrace());
- re.fillInStackTrace();
- throw new ServiceException(re);
- }
-
+
Message prototype = null;
try {
prototype = getReturnProtoType(method);
@@ -215,7 +202,7 @@ public class ProtobufRpcEngine implement
Message returnMessage;
try {
returnMessage = prototype.newBuilderForType()
- .mergeFrom(response.getResponse()).build();
+ .mergeFrom(val.responseMessage).build();
} catch (Throwable e) {
throw new ServiceException(e);
}
@@ -287,28 +274,28 @@ public class ProtobufRpcEngine implement
* Writable Wrapper for Protocol Buffer Responses
*/
private static class RpcResponseWritable implements Writable {
- HadoopRpcResponseProto message;
+ byte[] responseMessage;
@SuppressWarnings("unused")
public RpcResponseWritable() {
}
- public RpcResponseWritable(HadoopRpcResponseProto message) {
- this.message = message;
+ public RpcResponseWritable(Message message) {
+ this.responseMessage = message.toByteArray();
}
@Override
public void write(DataOutput out) throws IOException {
- ((Message)message).writeDelimitedTo(
- DataOutputOutputStream.constructOutputStream(out));
+ out.writeInt(responseMessage.length);
+ out.write(responseMessage);
}
@Override
public void readFields(DataInput in) throws IOException {
- int length = ProtoUtil.readRawVarint32(in);
+ int length = in.readInt();
byte[] bytes = new byte[length];
in.readFully(bytes);
- message = HadoopRpcResponseProto.parseFrom(bytes);
+ responseMessage = bytes;
}
}
@@ -356,24 +343,6 @@ public class ProtobufRpcEngine implement
registerProtocolAndImpl(RpcKind.RPC_PROTOCOL_BUFFER, protocolClass,
protocolImpl);
}
-
- private static RpcResponseWritable handleException(Throwable e) {
- HadoopRpcExceptionProto exception = HadoopRpcExceptionProto.newBuilder()
- .setExceptionName(e.getClass().getName())
- .setStackTrace(StringUtils.stringifyException(e)).build();
- HadoopRpcResponseProto response = HadoopRpcResponseProto.newBuilder()
- .setStatus(ResponseStatus.ERRROR).setException(exception).build();
- return new RpcResponseWritable(response);
- }
-
- private static HadoopRpcResponseProto constructProtoSpecificRpcSuccessResponse(
- Message message) {
- HadoopRpcResponseProto res = HadoopRpcResponseProto.newBuilder()
- .setResponse(message.toByteString())
- .setStatus(ResponseStatus.SUCCESS)
- .build();
- return res;
- }
/**
* Protobuf invoker for {@link RpcInvoker}
@@ -418,7 +387,7 @@ public class ProtobufRpcEngine implement
* </ol>
*/
public Writable call(RPC.Server server, String protocol,
- Writable writableRequest, long receiveTime) throws IOException {
+ Writable writableRequest, long receiveTime) throws Exception {
RpcRequestWritable request = (RpcRequestWritable) writableRequest;
HadoopRpcRequestProto rpcRequest = request.message;
String methodName = rpcRequest.getMethodName();
@@ -436,7 +405,7 @@ public class ProtobufRpcEngine implement
String msg = "Unknown method " + methodName + " called on " + protocol
+ " protocol.";
LOG.warn(msg);
- return handleException(new RpcServerException(msg));
+ throw new RpcServerException(msg);
}
Message prototype = service.getRequestPrototype(methodDescriptor);
Message param = prototype.newBuilderForType()
@@ -457,14 +426,11 @@ public class ProtobufRpcEngine implement
server.rpcDetailedMetrics.addProcessingTime(methodName,
processingTime);
} catch (ServiceException e) {
- Throwable cause = e.getCause();
- return handleException(cause != null ? cause : e);
+ throw (Exception) e.getCause();
} catch (Exception e) {
- return handleException(e);
+ throw e;
}
-
- HadoopRpcResponseProto response = constructProtoSpecificRpcSuccessResponse(result);
- return new RpcResponseWritable(response);
+ return new RpcResponseWritable(result);
}
}
}
Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java?rev=1304546&r1=1304545&r2=1304546&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java Fri Mar 23 18:27:17 2012
@@ -85,7 +85,7 @@ public class RPC {
* @throws IOException
**/
public Writable call(Server server, String protocol,
- Writable rpcRequest, long receiveTime) throws IOException ;
+ Writable rpcRequest, long receiveTime) throws Exception ;
}
static final Log LOG = LogFactory.getLog(RPC.class);
@@ -880,7 +880,7 @@ public class RPC {
@Override
public Writable call(RpcKind rpcKind, String protocol,
- Writable rpcRequest, long receiveTime) throws IOException {
+ Writable rpcRequest, long receiveTime) throws Exception {
return getRpcInvoker(rpcKind).call(this, protocol, rpcRequest,
receiveTime);
}
Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java?rev=1304546&r1=1304545&r2=1304546&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java Fri Mar 23 18:27:17 2012
@@ -1952,13 +1952,13 @@ public abstract class Server {
* Writable, long)} instead
*/
@Deprecated
- public Writable call(Writable param, long receiveTime) throws IOException {
+ public Writable call(Writable param, long receiveTime) throws Exception {
return call(RpcKind.RPC_BUILTIN, null, param, receiveTime);
}
/** Called for each call. */
public abstract Writable call(RpcKind rpcKind, String protocol,
- Writable param, long receiveTime) throws IOException;
+ Writable param, long receiveTime) throws Exception;
/**
* Authorize the incoming client connection.
Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/proto/hadoop_rpc.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/proto/hadoop_rpc.proto?rev=1304546&r1=1304545&r2=1304546&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/proto/hadoop_rpc.proto (original)
+++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/proto/hadoop_rpc.proto Fri Mar 23 18:27:17 2012
@@ -25,8 +25,11 @@ option java_outer_classname = "HadoopRpc
option java_generate_equals_and_hash = true;
/**
- * Message used to marshal the client request
+ * This message is used for Protobuf Rpc Engine.
+ * The message is used to marshal a Rpc-request
* from RPC client to the RPC server.
+ * The Response to the Rpc call (including errors) are handled
+ * as part of the standard Rpc response.
*/
message HadoopRpcRequestProto {
/** Name of the RPC method */
@@ -41,39 +44,3 @@ message HadoopRpcRequestProto {
/** protocol version of class declaring the called method */
required uint64 clientProtocolVersion = 4;
}
-
-/**
- * At the RPC layer, this message is used to indicate
- * the server side exception the the RPC client.
- *
- * Hadoop RPC client throws an exception indicated
- * by exceptionName with the stackTrace.
- */
-message HadoopRpcExceptionProto {
- /** Class name of the exception thrown from the server */
-
- optional string exceptionName = 1;
- /** Exception stack trace from the server side */
- optional string stackTrace = 2;
-}
-
-/**
- * This message is used to marshal the response from
- * RPC server to the client.
- */
-message HadoopRpcResponseProto {
- /** Status of IPC call */
- enum ResponseStatus {
- SUCCESS = 1;
- ERRROR = 2;
- }
-
- required ResponseStatus status = 1;
-
- // Protobuf response payload from the server, when status is SUCCESS.
- optional bytes response = 2;
-
- // Exception when status is ERROR or FATAL
- optional HadoopRpcExceptionProto exception = 3;
-}
-