You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sz...@apache.org on 2012/03/23 19:27:18 UTC

svn commit: r1304546 - in /hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common: ./ src/main/java/ src/main/java/org/apache/hadoop/ipc/ src/main/proto/

Author: szetszwo
Date: Fri Mar 23 18:27:17 2012
New Revision: 1304546

URL: http://svn.apache.org/viewvc?rev=1304546&view=rev
Log:
svn merge -c 1304542 from trunk for HADOOP-8184.

Modified:
    hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/   (props changed)
    hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/CHANGES.txt   (contents, props changed)
    hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/   (props changed)
    hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
    hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
    hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
    hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/proto/hadoop_rpc.proto

Propchange: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-common-project/hadoop-common:r1304542

Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1304546&r1=1304545&r2=1304546&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/CHANGES.txt Fri Mar 23 18:27:17 2012
@@ -100,6 +100,9 @@ Release 0.23.3 - UNRELEASED
 
     HADOOP-8200. Remove HADOOP_[JOBTRACKER|TASKTRACKER]_OPTS. (eli)
 
+    HADOOP-8184.  ProtoBuf RPC engine uses the IPC layer reply packet.
+    (Sanjay Radia via szetszwo)
+
   OPTIMIZATIONS
 
   BUG FIXES

Propchange: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/CHANGES.txt
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt:r1304542

Propchange: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java:r1304542

Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java?rev=1304546&r1=1304545&r2=1304546&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java Fri Mar 23 18:27:17 2012
@@ -39,15 +39,12 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.ipc.Client.ConnectionId;
 import org.apache.hadoop.ipc.RPC.RpcInvoker;
 import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
-import org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcExceptionProto;
+
 import org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcRequestProto;
-import org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcResponseProto;
-import org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcResponseProto.ResponseStatus;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.util.ProtoUtil;
-import org.apache.hadoop.util.StringUtils;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.BlockingService;
@@ -191,21 +188,11 @@ public class ProtobufRpcEngine implement
         throw new ServiceException(e);
       }
 
-      HadoopRpcResponseProto response = val.message;
       if (LOG.isDebugEnabled()) {
         long callTime = System.currentTimeMillis() - startTime;
         LOG.debug("Call: " + method.getName() + " " + callTime);
       }
-
-      // Wrap the received message
-      ResponseStatus status = response.getStatus();
-      if (status != ResponseStatus.SUCCESS) {
-        RemoteException re =  new RemoteException(response.getException()
-            .getExceptionName(), response.getException().getStackTrace());
-        re.fillInStackTrace();
-        throw new ServiceException(re);
-      }
-
+      
       Message prototype = null;
       try {
         prototype = getReturnProtoType(method);
@@ -215,7 +202,7 @@ public class ProtobufRpcEngine implement
       Message returnMessage;
       try {
         returnMessage = prototype.newBuilderForType()
-            .mergeFrom(response.getResponse()).build();
+            .mergeFrom(val.responseMessage).build();
       } catch (Throwable e) {
         throw new ServiceException(e);
       }
@@ -287,28 +274,28 @@ public class ProtobufRpcEngine implement
    * Writable Wrapper for Protocol Buffer Responses
    */
   private static class RpcResponseWritable implements Writable {
-    HadoopRpcResponseProto message;
+    byte[] responseMessage;
 
     @SuppressWarnings("unused")
     public RpcResponseWritable() {
     }
 
-    public RpcResponseWritable(HadoopRpcResponseProto message) {
-      this.message = message;
+    public RpcResponseWritable(Message message) {
+      this.responseMessage = message.toByteArray();
     }
 
     @Override
     public void write(DataOutput out) throws IOException {
-      ((Message)message).writeDelimitedTo(
-          DataOutputOutputStream.constructOutputStream(out));      
+      out.writeInt(responseMessage.length);
+      out.write(responseMessage);     
     }
 
     @Override
     public void readFields(DataInput in) throws IOException {
-      int length = ProtoUtil.readRawVarint32(in);
+      int length = in.readInt();
       byte[] bytes = new byte[length];
       in.readFully(bytes);
-      message = HadoopRpcResponseProto.parseFrom(bytes);
+      responseMessage = bytes;
     }
   }
 
@@ -356,24 +343,6 @@ public class ProtobufRpcEngine implement
       registerProtocolAndImpl(RpcKind.RPC_PROTOCOL_BUFFER, protocolClass,
           protocolImpl);
     }
-
-    private static RpcResponseWritable handleException(Throwable e) {
-      HadoopRpcExceptionProto exception = HadoopRpcExceptionProto.newBuilder()
-          .setExceptionName(e.getClass().getName())
-          .setStackTrace(StringUtils.stringifyException(e)).build();
-      HadoopRpcResponseProto response = HadoopRpcResponseProto.newBuilder()
-          .setStatus(ResponseStatus.ERRROR).setException(exception).build();
-      return new RpcResponseWritable(response);
-    }
-
-    private static HadoopRpcResponseProto constructProtoSpecificRpcSuccessResponse(
-        Message message) {
-      HadoopRpcResponseProto res = HadoopRpcResponseProto.newBuilder()
-          .setResponse(message.toByteString())
-          .setStatus(ResponseStatus.SUCCESS)
-          .build();
-      return res;
-    }
     
     /**
      * Protobuf invoker for {@link RpcInvoker}
@@ -418,7 +387,7 @@ public class ProtobufRpcEngine implement
        * </ol>
        */
       public Writable call(RPC.Server server, String protocol,
-          Writable writableRequest, long receiveTime) throws IOException {
+          Writable writableRequest, long receiveTime) throws Exception {
         RpcRequestWritable request = (RpcRequestWritable) writableRequest;
         HadoopRpcRequestProto rpcRequest = request.message;
         String methodName = rpcRequest.getMethodName();
@@ -436,7 +405,7 @@ public class ProtobufRpcEngine implement
           String msg = "Unknown method " + methodName + " called on " + protocol
               + " protocol.";
           LOG.warn(msg);
-          return handleException(new RpcServerException(msg));
+          throw new RpcServerException(msg);
         }
         Message prototype = service.getRequestPrototype(methodDescriptor);
         Message param = prototype.newBuilderForType()
@@ -457,14 +426,11 @@ public class ProtobufRpcEngine implement
           server.rpcDetailedMetrics.addProcessingTime(methodName,
               processingTime);
         } catch (ServiceException e) {
-          Throwable cause = e.getCause();
-          return handleException(cause != null ? cause : e);
+          throw (Exception) e.getCause();
         } catch (Exception e) {
-          return handleException(e);
+          throw e;
         }
-  
-        HadoopRpcResponseProto response = constructProtoSpecificRpcSuccessResponse(result);
-        return new RpcResponseWritable(response);
+        return new RpcResponseWritable(result);
       }
     }
   }

Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java?rev=1304546&r1=1304545&r2=1304546&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java Fri Mar 23 18:27:17 2012
@@ -85,7 +85,7 @@ public class RPC {
      * @throws IOException
      **/
     public Writable call(Server server, String protocol,
-        Writable rpcRequest, long receiveTime) throws IOException ;
+        Writable rpcRequest, long receiveTime) throws Exception ;
   }
   
   static final Log LOG = LogFactory.getLog(RPC.class);
@@ -880,7 +880,7 @@ public class RPC {
     
     @Override
     public Writable call(RpcKind rpcKind, String protocol,
-        Writable rpcRequest, long receiveTime) throws IOException {
+        Writable rpcRequest, long receiveTime) throws Exception {
       return getRpcInvoker(rpcKind).call(this, protocol, rpcRequest,
           receiveTime);
     }

Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java?rev=1304546&r1=1304545&r2=1304546&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java Fri Mar 23 18:27:17 2012
@@ -1952,13 +1952,13 @@ public abstract class Server {
    *  Writable, long)} instead
    */
   @Deprecated
-  public Writable call(Writable param, long receiveTime) throws IOException {
+  public Writable call(Writable param, long receiveTime) throws Exception {
     return call(RpcKind.RPC_BUILTIN, null, param, receiveTime);
   }
   
   /** Called for each call. */
   public abstract Writable call(RpcKind rpcKind, String protocol,
-      Writable param, long receiveTime) throws IOException;
+      Writable param, long receiveTime) throws Exception;
   
   /**
    * Authorize the incoming client connection.

Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/proto/hadoop_rpc.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/proto/hadoop_rpc.proto?rev=1304546&r1=1304545&r2=1304546&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/proto/hadoop_rpc.proto (original)
+++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/proto/hadoop_rpc.proto Fri Mar 23 18:27:17 2012
@@ -25,8 +25,11 @@ option java_outer_classname = "HadoopRpc
 option java_generate_equals_and_hash = true;
 
 /**
- * Message used to marshal the client request
+ * This message is used for Protobuf Rpc Engine.
+ * The message is used to marshal a Rpc-request
  * from RPC client to the RPC server.
+ * The Response to the Rpc call (including errors) are handled
+ * as part of the standard Rpc response. 
  */
 message HadoopRpcRequestProto {
   /** Name of the RPC method */
@@ -41,39 +44,3 @@ message HadoopRpcRequestProto {
   /** protocol version of class declaring the called method */
   required uint64 clientProtocolVersion = 4;
 }
-
-/**
- * At the RPC layer, this message is used to indicate
- * the server side exception the the RPC client.
- *
- * Hadoop RPC client throws an exception indicated
- * by exceptionName with the stackTrace.
- */
-message HadoopRpcExceptionProto {
-  /** Class name of the exception thrown from the server */
-
-  optional string exceptionName = 1;
-  /** Exception stack trace from the server side */
-  optional string stackTrace = 2;
-}
-
-/**
- * This message is used to marshal the response from
- * RPC server to the client.
- */
-message HadoopRpcResponseProto {
-  /** Status of IPC call */
-  enum ResponseStatus {
-    SUCCESS = 1;
-    ERRROR = 2;
-  }
-
-  required ResponseStatus status = 1;
-
-  // Protobuf response payload from the server, when status is SUCCESS.
-  optional bytes response = 2;
-
-  // Exception when status is ERROR or FATAL
-  optional HadoopRpcExceptionProto exception = 3;
-}
-