You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cl...@apache.org on 2019/06/28 23:06:15 UTC

[hadoop] 12/50: HDFS-13665. [SBN read] Move RPC response serialization into Server.doResponse(). Contributed by Plamen Jeliazkov.

This is an automated email from the ASF dual-hosted git repository.

cliang pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 7e0a71dd7fb204b5911c8ecbd958b2153fdddf18
Author: Plamen Jeliazkov <pl...@gmail.com>
AuthorDate: Wed Jul 11 16:07:05 2018 -0700

    HDFS-13665. [SBN read] Move RPC response serialization into Server.doResponse(). Contributed by Plamen Jeliazkov.
---
 .../main/java/org/apache/hadoop/ipc/Server.java    | 43 +++++++++-------------
 1 file changed, 17 insertions(+), 26 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
index 7aa8001..32de3d3 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
@@ -835,15 +835,15 @@ public abstract class Server {
     final Writable rpcRequest;    // Serialized Rpc request from client
     ByteBuffer rpcResponse;       // the response for this call
 
-    private RpcResponseHeaderProto bufferedHeader; // the response header
-    private Writable bufferedRv;                   // the byte response
+    private ResponseParams responseParams; // the response params
+    private Writable rv;                   // the byte response
 
     RpcCall(RpcCall call) {
       super(call);
       this.connection = call.connection;
       this.rpcRequest = call.rpcRequest;
-      this.bufferedRv = call.bufferedRv;
-      this.bufferedHeader = call.bufferedHeader;
+      this.rv = call.rv;
+      this.responseParams = call.responseParams;
     }
 
     RpcCall(Connection connection, int id) {
@@ -864,12 +864,10 @@ public abstract class Server {
       this.rpcRequest = param;
     }
 
-    public void setBufferedHeader(RpcResponseHeaderProto header) {
-      this.bufferedHeader = header;
-    }
-
-    public void setBufferedRv(Writable rv) {
-      this.bufferedRv = rv;
+    void setResponseFields(Writable returnValue,
+                           ResponseParams responseParams) {
+      this.rv = returnValue;
+      this.responseParams = responseParams;
     }
 
     @Override
@@ -903,9 +901,7 @@ public abstract class Server {
         populateResponseParamsOnError(e, responseParams);
       }
       if (!isResponseDeferred()) {
-        setupResponse(this, responseParams.returnStatus,
-            responseParams.detailedErr,
-            value, responseParams.errorClass, responseParams.error);
+        setResponseFields(value, responseParams);
         sendResponse();
       } else {
         if (LOG.isDebugEnabled()) {
@@ -960,13 +956,11 @@ public abstract class Server {
         setupResponse(call,
             RpcStatusProto.FATAL, RpcErrorCodeProto.ERROR_RPC_SERVER,
             null, t.getClass().getName(), StringUtils.stringifyException(t));
-      } else if (alignmentContext != null) {
-        // rebuild response with state context in header
-        RpcResponseHeaderProto.Builder responseHeader =
-            call.bufferedHeader.toBuilder();
-        alignmentContext.updateResponseState(responseHeader);
-        RpcResponseHeaderProto builtHeader = responseHeader.build();
-        setupResponse(call, builtHeader, call.bufferedRv);
+      } else {
+        setupResponse(call, call.responseParams.returnStatus,
+            call.responseParams.detailedErr, call.rv,
+            call.responseParams.errorClass,
+            call.responseParams.error);
       }
       connection.sendResponse(call);
     }
@@ -2956,6 +2950,9 @@ public abstract class Server {
     headerBuilder.setRetryCount(call.retryCount);
     headerBuilder.setStatus(status);
     headerBuilder.setServerIpcVersionNum(CURRENT_VERSION);
+    if (alignmentContext != null) {
+      alignmentContext.updateResponseState(headerBuilder);
+    }
 
     if (status == RpcStatusProto.SUCCESS) {
       RpcResponseHeaderProto header = headerBuilder.build();
@@ -2982,12 +2979,6 @@ public abstract class Server {
 
   private void setupResponse(RpcCall call,
       RpcResponseHeaderProto header, Writable rv) throws IOException {
-    if (alignmentContext != null && call.bufferedHeader == null
-        && call.bufferedRv == null) {
-      call.setBufferedHeader(header);
-      call.setBufferedRv(rv);
-    }
-
     final byte[] response;
     if (rv == null || (rv instanceof RpcWritable.ProtobufWrapper)) {
       response = setupResponseForProtobuf(header, rv);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org