You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/11/16 07:55:13 UTC

[rocketmq] 02/02: Rename the rpc header

This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 6381e6e19fd7658097923d0f9d1663b97ad4efe0
Author: dongeforever <do...@apache.org>
AuthorDate: Tue Nov 16 15:54:58 2021 +0800

    Rename the rpc header
---
 .../GetEarliestMsgStoretimeResponseHeader.java     |  3 +-
 .../header/GetMinOffsetResponseHeader.java         |  3 +-
 .../protocol/header/PullMessageResponseHeader.java |  3 +-
 .../header/SearchOffsetResponseHeader.java         |  3 +-
 .../apache/rocketmq/common/rpc/RequestBuilder.java |  6 ++--
 .../apache/rocketmq/common/rpc/RpcClientImpl.java  | 32 +++++++++-----------
 .../apache/rocketmq/common/rpc/RpcClientUtils.java |  4 +--
 .../RpcHeader.java}                                | 34 ++++++++++++----------
 .../org/apache/rocketmq/common/rpc/RpcRequest.java |  6 ++--
 ...{CommonRpcHeader.java => RpcRequestHeader.java} | 15 +---------
 .../apache/rocketmq/common/rpc/RpcResponse.java    | 22 +++++++-------
 .../common/rpc/TopicQueueRequestHeader.java        |  2 +-
 .../remoting/protocol/RemotingCommand.java         |  4 +--
 13 files changed, 63 insertions(+), 74 deletions(-)

diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetEarliestMsgStoretimeResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetEarliestMsgStoretimeResponseHeader.java
index 6b9b3b2..10e7d82 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetEarliestMsgStoretimeResponseHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetEarliestMsgStoretimeResponseHeader.java
@@ -20,11 +20,12 @@
  */
 package org.apache.rocketmq.common.protocol.header;
 
+import org.apache.rocketmq.common.rpc.RpcHeader;
 import org.apache.rocketmq.remoting.CommandCustomHeader;
 import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
-public class GetEarliestMsgStoretimeResponseHeader implements CommandCustomHeader {
+public class GetEarliestMsgStoretimeResponseHeader extends RpcHeader {
     @CFNotNull
     private Long timestamp;
 
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMinOffsetResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMinOffsetResponseHeader.java
index 6fc0fac..57cb050 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMinOffsetResponseHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMinOffsetResponseHeader.java
@@ -20,11 +20,12 @@
  */
 package org.apache.rocketmq.common.protocol.header;
 
+import org.apache.rocketmq.common.rpc.RpcHeader;
 import org.apache.rocketmq.remoting.CommandCustomHeader;
 import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
-public class GetMinOffsetResponseHeader implements CommandCustomHeader {
+public class GetMinOffsetResponseHeader extends RpcHeader {
     @CFNotNull
     private Long offset;
 
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageResponseHeader.java
index 88af984..e2f896b 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageResponseHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageResponseHeader.java
@@ -20,12 +20,13 @@
  */
 package org.apache.rocketmq.common.protocol.header;
 
+import org.apache.rocketmq.common.rpc.RpcHeader;
 import org.apache.rocketmq.remoting.CommandCustomHeader;
 import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.annotation.CFNullable;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
-public class PullMessageResponseHeader implements CommandCustomHeader {
+public class PullMessageResponseHeader extends RpcHeader {
     @CFNotNull
     private Long suggestWhichBrokerId;
     @CFNotNull
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetResponseHeader.java
index f88ac68..416f7f7 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetResponseHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetResponseHeader.java
@@ -20,11 +20,12 @@
  */
 package org.apache.rocketmq.common.protocol.header;
 
+import org.apache.rocketmq.common.rpc.RpcHeader;
 import org.apache.rocketmq.remoting.CommandCustomHeader;
 import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
-public class SearchOffsetResponseHeader implements CommandCustomHeader {
+public class SearchOffsetResponseHeader extends RpcHeader {
     @CFNotNull
     private Long offset;
 
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RequestBuilder.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RequestBuilder.java
index 7655871..a791543 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/RequestBuilder.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RequestBuilder.java
@@ -14,17 +14,17 @@ public class RequestBuilder {
         requestCodeMap.put(RequestCode.PULL_MESSAGE, PullMessageRequestHeader.class);
     }
 
-    public static CommonRpcHeader buildCommonRpcHeader(int requestCode, String destBrokerName) {
+    public static RpcRequestHeader buildCommonRpcHeader(int requestCode, String destBrokerName) {
         return buildCommonRpcHeader(requestCode, null, destBrokerName);
     }
 
-    public static CommonRpcHeader buildCommonRpcHeader(int requestCode, Boolean oneway, String destBrokerName) {
+    public static RpcRequestHeader buildCommonRpcHeader(int requestCode, Boolean oneway, String destBrokerName) {
         Class requestHeaderClass = requestCodeMap.get(requestCode);
         if (requestHeaderClass == null) {
             throw new UnsupportedOperationException("unknown " + requestCode);
         }
         try {
-            CommonRpcHeader requestHeader = (CommonRpcHeader) requestHeaderClass.newInstance();
+            RpcRequestHeader requestHeader = (RpcRequestHeader) requestHeaderClass.newInstance();
             requestHeader.setCode(requestCode);
             requestHeader.setOneway(oneway);
             requestHeader.setBname(destBrokerName);
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java
index 81d5d04..8d557a1 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java
@@ -1,9 +1,5 @@
 package org.apache.rocketmq.common.rpc;
 
-import com.google.common.util.concurrent.Futures;
-import com.sun.org.apache.xpath.internal.functions.FuncPosition;
-import io.netty.util.concurrent.DefaultPromise;
-import io.netty.util.concurrent.EventExecutor;
 import io.netty.util.concurrent.ImmediateEventExecutor;
 import io.netty.util.concurrent.Promise;
 import org.apache.rocketmq.common.message.MessageQueue;
@@ -20,10 +16,7 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
-import java.util.concurrent.ThreadFactory;
 
 public class RpcClientImpl implements RpcClient {
 
@@ -68,12 +61,12 @@ public class RpcClientImpl implements RpcClient {
         String addr = getBrokerAddrByNameOrException(request.getHeader().bname);
         Promise<RpcResponse> rpcResponsePromise = null;
         try {
-            switch (request.getCode()) {
+            switch (request.getHeader().getCode()) {
                 case RequestCode.PULL_MESSAGE:
                     rpcResponsePromise = handlePullMessage(addr, request, timeoutMs);
                     break;
                 default:
-                    throw new RpcException(ResponseCode.REQUEST_CODE_NOT_SUPPORTED, "Unknown request code " + request.getCode());
+                    throw new RpcException(ResponseCode.REQUEST_CODE_NOT_SUPPORTED, "Unknown request code " + request.getHeader().getCode());
             }
         } catch (RpcException rpcException) {
             throw rpcException;
@@ -135,7 +128,8 @@ public class RpcClientImpl implements RpcClient {
                         case ResponseCode.PULL_OFFSET_MOVED:
                             PullMessageResponseHeader responseHeader =
                                     (PullMessageResponseHeader) responseCommand.decodeCommandCustomHeader(PullMessageResponseHeader.class);
-                            rpcResponsePromise.setSuccess(new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody()));
+                            responseHeader.setCode(responseCommand.getCode());
+                            rpcResponsePromise.setSuccess(new RpcResponse(responseHeader, responseCommand.getBody()));
                         default:
                             RpcResponse rpcResponse = new RpcResponse(new RpcException(responseCommand.getCode(), "unexpected remote response code"));
                             rpcResponsePromise.setSuccess(rpcResponse);
@@ -162,11 +156,11 @@ public class RpcClientImpl implements RpcClient {
             case ResponseCode.SUCCESS: {
                 SearchOffsetResponseHeader responseHeader =
                         (SearchOffsetResponseHeader) responseCommand.decodeCommandCustomHeader(SearchOffsetResponseHeader.class);
-                return new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody());
+                responseHeader.setCode(responseCommand.getCode());
+                return new RpcResponse(responseHeader, responseCommand.getBody());
             }
             default:{
-                RpcResponse rpcResponse = new RpcResponse(responseCommand.getCode(), null, null);
-                rpcResponse.setException(new RpcException(responseCommand.getCode(), "unknown remote error"));
+                RpcResponse rpcResponse = new RpcResponse(new RpcException(responseCommand.getCode(), "unknown remote error"));
                 return rpcResponse;
             }
         }
@@ -183,11 +177,11 @@ public class RpcClientImpl implements RpcClient {
             case ResponseCode.SUCCESS: {
                 GetMinOffsetResponseHeader responseHeader =
                         (GetMinOffsetResponseHeader) responseCommand.decodeCommandCustomHeader(GetMinOffsetResponseHeader.class);
-                return new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody());
+                responseHeader.setCode(responseCommand.getCode());
+                return new RpcResponse(responseHeader, responseCommand.getBody());
             }
             default:{
-                RpcResponse rpcResponse = new RpcResponse(responseCommand.getCode(), null, null);
-                rpcResponse.setException(new RpcException(responseCommand.getCode(), "unknown remote error"));
+                RpcResponse rpcResponse = new RpcResponse(new RpcException(responseCommand.getCode(), "unknown remote error"));
                 return rpcResponse;
             }
         }
@@ -204,12 +198,12 @@ public class RpcClientImpl implements RpcClient {
             case ResponseCode.SUCCESS: {
                 GetEarliestMsgStoretimeResponseHeader responseHeader =
                         (GetEarliestMsgStoretimeResponseHeader) responseCommand.decodeCommandCustomHeader(GetEarliestMsgStoretimeResponseHeader.class);
-                return new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody());
+                responseHeader.setCode(responseCommand.getCode());
+                return new RpcResponse(responseHeader, responseCommand.getBody());
 
             }
             default:{
-                RpcResponse rpcResponse = new RpcResponse(responseCommand.getCode(), null, null);
-                rpcResponse.setException(new RpcException(responseCommand.getCode(), "unknown remote error"));
+                RpcResponse rpcResponse = new RpcResponse(new RpcException(responseCommand.getCode(), "unknown remote error"));
                 return rpcResponse;
             }
         }
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientUtils.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientUtils.java
index 3ae06b7..654b2ed 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientUtils.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientUtils.java
@@ -8,13 +8,13 @@ import java.nio.ByteBuffer;
 public class RpcClientUtils {
 
     public static RemotingCommand createCommandForRpcRequest(RpcRequest rpcRequest) {
-        RemotingCommand cmd = RemotingCommand.createRequestCommand(rpcRequest.getCode(), rpcRequest.getHeader());
+        RemotingCommand cmd = RemotingCommand.createRequestCommandWithHeader(rpcRequest.getHeader().getCode(), rpcRequest.getHeader());
         cmd.setBody(encodeBody(rpcRequest.getBody()));
         return cmd;
     }
 
     public static RemotingCommand createCommandForRpcResponse(RpcResponse rpcResponse) {
-        RemotingCommand cmd = RemotingCommand.createResponseCommand(rpcResponse.getCode(), rpcResponse.getHeader());
+        RemotingCommand cmd = RemotingCommand.createResponseCommandWithHeader(rpcResponse.getCode(), rpcResponse.getHeader());
         cmd.setRemark(rpcResponse.getException() == null ? "" : rpcResponse.getException().getMessage());
         cmd.setBody(encodeBody(rpcResponse.getBody()));
         return cmd;
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMinOffsetResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcHeader.java
similarity index 69%
copy from common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMinOffsetResponseHeader.java
copy to common/src/main/java/org/apache/rocketmq/common/rpc/RpcHeader.java
index 6fc0fac..18b64d5 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMinOffsetResponseHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcHeader.java
@@ -14,29 +14,33 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
-/**
- * $Id: GetMinOffsetResponseHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
- */
-package org.apache.rocketmq.common.protocol.header;
+package org.apache.rocketmq.common.rpc;
 
 import org.apache.rocketmq.remoting.CommandCustomHeader;
-import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
-public class GetMinOffsetResponseHeader implements CommandCustomHeader {
-    @CFNotNull
-    private Long offset;
+public class RpcHeader implements CommandCustomHeader {
 
-    @Override
-    public void checkFields() throws RemotingCommandException {
+
+    protected int code;
+
+    public RpcHeader() {
     }
 
-    public Long getOffset() {
-        return offset;
+    public RpcHeader(int code) {
+        this.code = code;
     }
 
-    public void setOffset(Long offset) {
-        this.offset = offset;
+    public int getCode() {
+        return code;
+    }
+
+    public void setCode(int code) {
+        this.code = code;
+    }
+
+    @Override
+    public void checkFields() throws RemotingCommandException {
+
     }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequest.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequest.java
index 06d5c79..3ebe3fe 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequest.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequest.java
@@ -17,15 +17,15 @@
 package org.apache.rocketmq.common.rpc;
 
 public class RpcRequest {
-    private CommonRpcHeader header;
+    private RpcRequestHeader header;
     private Object body;
 
-    public RpcRequest(CommonRpcHeader header, Object body) {
+    public RpcRequest(RpcRequestHeader header, Object body) {
         this.header = header;
         this.body = body;
     }
 
-    public CommonRpcHeader getHeader() {
+    public RpcRequestHeader getHeader() {
         return header;
     }
 
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/CommonRpcHeader.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequestHeader.java
similarity index 85%
rename from common/src/main/java/org/apache/rocketmq/common/rpc/CommonRpcHeader.java
rename to common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequestHeader.java
index df62d52..c5c748d 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/CommonRpcHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequestHeader.java
@@ -16,16 +16,11 @@
  */
 package org.apache.rocketmq.common.rpc;
 
-import org.apache.rocketmq.remoting.CommandCustomHeader;
-
-public abstract class CommonRpcHeader implements CommandCustomHeader {
+public abstract class RpcRequestHeader extends RpcHeader {
     //the namespace name
     protected String namespace;
     //if the data has been namespaced
     protected Boolean namespaced;
-
-    protected int code;
-
     //the abstract remote addr name, usually the physical broker name
     protected String bname;
 
@@ -62,12 +57,4 @@ public abstract class CommonRpcHeader implements CommandCustomHeader {
     public void setNamespaced(Boolean namespaced) {
         this.namespaced = namespaced;
     }
-
-    public int getCode() {
-        return code;
-    }
-
-    public void setCode(int code) {
-        this.code = code;
-    }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcResponse.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcResponse.java
index 601f942..be7cf9b 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcResponse.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcResponse.java
@@ -16,11 +16,8 @@
  */
 package org.apache.rocketmq.common.rpc;
 
-import org.apache.rocketmq.remoting.CommandCustomHeader;
-
 public class RpcResponse   {
-    private int code;
-    private CommandCustomHeader header;
+    private RpcHeader header;
     private Object body;
     public RpcException exception;
 
@@ -28,29 +25,32 @@ public class RpcResponse   {
 
     }
 
-    public RpcResponse(int code, CommandCustomHeader header, byte[] body) {
-        this.code = code;
+    public RpcResponse(RpcHeader header, byte[] body) {
         this.header = header;
         this.body = body;
     }
 
     public RpcResponse(RpcException rpcException) {
-        this.code = rpcException.getErrorCode();
+        this.header = new RpcHeader(rpcException.getErrorCode());
         this.exception = rpcException;
     }
 
-    public int getCode() {
-        return code;
+    public RpcHeader getHeader() {
+        return header;
     }
 
-    public CommandCustomHeader getHeader() {
-        return header;
+    public void setHeader(RpcHeader header) {
+        this.header = header;
     }
 
     public Object getBody() {
         return body;
     }
 
+    public void setBody(Object body) {
+        this.body = body;
+    }
+
     public RpcException getException() {
         return exception;
     }
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/TopicQueueRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/rpc/TopicQueueRequestHeader.java
index 4b0a394..5d0a151 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/TopicQueueRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/TopicQueueRequestHeader.java
@@ -16,7 +16,7 @@
  */
 package org.apache.rocketmq.common.rpc;
 
-public abstract class TopicQueueRequestHeader extends CommonRpcHeader {
+public abstract class TopicQueueRequestHeader extends RpcRequestHeader {
     //Physical or Logical
     protected Boolean physical;
 
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
index 65ffaf5..08bb202 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
@@ -87,7 +87,7 @@ public class RemotingCommand {
     }
 
 
-    public static RemotingCommand createRequestCommand(int code, CommandCustomHeader customHeader) {
+    public static RemotingCommand createRequestCommandWithHeader(int code, CommandCustomHeader customHeader) {
         RemotingCommand cmd = new RemotingCommand();
         cmd.setCode(code);
         cmd.customHeader = customHeader;
@@ -95,7 +95,7 @@ public class RemotingCommand {
         return cmd;
     }
 
-    public static RemotingCommand createResponseCommand(int code, CommandCustomHeader customHeader) {
+    public static RemotingCommand createResponseCommandWithHeader(int code, CommandCustomHeader customHeader) {
         RemotingCommand cmd = new RemotingCommand();
         cmd.setCode(code);
         cmd.markResponseType();