You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/11/16 07:55:13 UTC
[rocketmq] 02/02: Rename the rpc header
This is an automated email from the ASF dual-hosted git repository.
dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 6381e6e19fd7658097923d0f9d1663b97ad4efe0
Author: dongeforever <do...@apache.org>
AuthorDate: Tue Nov 16 15:54:58 2021 +0800
Rename the rpc header
---
.../GetEarliestMsgStoretimeResponseHeader.java | 3 +-
.../header/GetMinOffsetResponseHeader.java | 3 +-
.../protocol/header/PullMessageResponseHeader.java | 3 +-
.../header/SearchOffsetResponseHeader.java | 3 +-
.../apache/rocketmq/common/rpc/RequestBuilder.java | 6 ++--
.../apache/rocketmq/common/rpc/RpcClientImpl.java | 32 +++++++++-----------
.../apache/rocketmq/common/rpc/RpcClientUtils.java | 4 +--
.../RpcHeader.java} | 34 ++++++++++++----------
.../org/apache/rocketmq/common/rpc/RpcRequest.java | 6 ++--
...{CommonRpcHeader.java => RpcRequestHeader.java} | 15 +---------
.../apache/rocketmq/common/rpc/RpcResponse.java | 22 +++++++-------
.../common/rpc/TopicQueueRequestHeader.java | 2 +-
.../remoting/protocol/RemotingCommand.java | 4 +--
13 files changed, 63 insertions(+), 74 deletions(-)
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetEarliestMsgStoretimeResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetEarliestMsgStoretimeResponseHeader.java
index 6b9b3b2..10e7d82 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetEarliestMsgStoretimeResponseHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetEarliestMsgStoretimeResponseHeader.java
@@ -20,11 +20,12 @@
*/
package org.apache.rocketmq.common.protocol.header;
+import org.apache.rocketmq.common.rpc.RpcHeader;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-public class GetEarliestMsgStoretimeResponseHeader implements CommandCustomHeader {
+public class GetEarliestMsgStoretimeResponseHeader extends RpcHeader {
@CFNotNull
private Long timestamp;
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMinOffsetResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMinOffsetResponseHeader.java
index 6fc0fac..57cb050 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMinOffsetResponseHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMinOffsetResponseHeader.java
@@ -20,11 +20,12 @@
*/
package org.apache.rocketmq.common.protocol.header;
+import org.apache.rocketmq.common.rpc.RpcHeader;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-public class GetMinOffsetResponseHeader implements CommandCustomHeader {
+public class GetMinOffsetResponseHeader extends RpcHeader {
@CFNotNull
private Long offset;
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageResponseHeader.java
index 88af984..e2f896b 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageResponseHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageResponseHeader.java
@@ -20,12 +20,13 @@
*/
package org.apache.rocketmq.common.protocol.header;
+import org.apache.rocketmq.common.rpc.RpcHeader;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.annotation.CFNullable;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-public class PullMessageResponseHeader implements CommandCustomHeader {
+public class PullMessageResponseHeader extends RpcHeader {
@CFNotNull
private Long suggestWhichBrokerId;
@CFNotNull
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetResponseHeader.java
index f88ac68..416f7f7 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetResponseHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetResponseHeader.java
@@ -20,11 +20,12 @@
*/
package org.apache.rocketmq.common.protocol.header;
+import org.apache.rocketmq.common.rpc.RpcHeader;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-public class SearchOffsetResponseHeader implements CommandCustomHeader {
+public class SearchOffsetResponseHeader extends RpcHeader {
@CFNotNull
private Long offset;
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RequestBuilder.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RequestBuilder.java
index 7655871..a791543 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/RequestBuilder.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RequestBuilder.java
@@ -14,17 +14,17 @@ public class RequestBuilder {
requestCodeMap.put(RequestCode.PULL_MESSAGE, PullMessageRequestHeader.class);
}
- public static CommonRpcHeader buildCommonRpcHeader(int requestCode, String destBrokerName) {
+ public static RpcRequestHeader buildCommonRpcHeader(int requestCode, String destBrokerName) {
return buildCommonRpcHeader(requestCode, null, destBrokerName);
}
- public static CommonRpcHeader buildCommonRpcHeader(int requestCode, Boolean oneway, String destBrokerName) {
+ public static RpcRequestHeader buildCommonRpcHeader(int requestCode, Boolean oneway, String destBrokerName) {
Class requestHeaderClass = requestCodeMap.get(requestCode);
if (requestHeaderClass == null) {
throw new UnsupportedOperationException("unknown " + requestCode);
}
try {
- CommonRpcHeader requestHeader = (CommonRpcHeader) requestHeaderClass.newInstance();
+ RpcRequestHeader requestHeader = (RpcRequestHeader) requestHeaderClass.newInstance();
requestHeader.setCode(requestCode);
requestHeader.setOneway(oneway);
requestHeader.setBname(destBrokerName);
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java
index 81d5d04..8d557a1 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java
@@ -1,9 +1,5 @@
package org.apache.rocketmq.common.rpc;
-import com.google.common.util.concurrent.Futures;
-import com.sun.org.apache.xpath.internal.functions.FuncPosition;
-import io.netty.util.concurrent.DefaultPromise;
-import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.ImmediateEventExecutor;
import io.netty.util.concurrent.Promise;
import org.apache.rocketmq.common.message.MessageQueue;
@@ -20,10 +16,7 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import java.util.concurrent.ThreadFactory;
public class RpcClientImpl implements RpcClient {
@@ -68,12 +61,12 @@ public class RpcClientImpl implements RpcClient {
String addr = getBrokerAddrByNameOrException(request.getHeader().bname);
Promise<RpcResponse> rpcResponsePromise = null;
try {
- switch (request.getCode()) {
+ switch (request.getHeader().getCode()) {
case RequestCode.PULL_MESSAGE:
rpcResponsePromise = handlePullMessage(addr, request, timeoutMs);
break;
default:
- throw new RpcException(ResponseCode.REQUEST_CODE_NOT_SUPPORTED, "Unknown request code " + request.getCode());
+ throw new RpcException(ResponseCode.REQUEST_CODE_NOT_SUPPORTED, "Unknown request code " + request.getHeader().getCode());
}
} catch (RpcException rpcException) {
throw rpcException;
@@ -135,7 +128,8 @@ public class RpcClientImpl implements RpcClient {
case ResponseCode.PULL_OFFSET_MOVED:
PullMessageResponseHeader responseHeader =
(PullMessageResponseHeader) responseCommand.decodeCommandCustomHeader(PullMessageResponseHeader.class);
- rpcResponsePromise.setSuccess(new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody()));
+ responseHeader.setCode(responseCommand.getCode());
+ rpcResponsePromise.setSuccess(new RpcResponse(responseHeader, responseCommand.getBody()));
default:
RpcResponse rpcResponse = new RpcResponse(new RpcException(responseCommand.getCode(), "unexpected remote response code"));
rpcResponsePromise.setSuccess(rpcResponse);
@@ -162,11 +156,11 @@ public class RpcClientImpl implements RpcClient {
case ResponseCode.SUCCESS: {
SearchOffsetResponseHeader responseHeader =
(SearchOffsetResponseHeader) responseCommand.decodeCommandCustomHeader(SearchOffsetResponseHeader.class);
- return new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody());
+ responseHeader.setCode(responseCommand.getCode());
+ return new RpcResponse(responseHeader, responseCommand.getBody());
}
default:{
- RpcResponse rpcResponse = new RpcResponse(responseCommand.getCode(), null, null);
- rpcResponse.setException(new RpcException(responseCommand.getCode(), "unknown remote error"));
+ RpcResponse rpcResponse = new RpcResponse(new RpcException(responseCommand.getCode(), "unknown remote error"));
return rpcResponse;
}
}
@@ -183,11 +177,11 @@ public class RpcClientImpl implements RpcClient {
case ResponseCode.SUCCESS: {
GetMinOffsetResponseHeader responseHeader =
(GetMinOffsetResponseHeader) responseCommand.decodeCommandCustomHeader(GetMinOffsetResponseHeader.class);
- return new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody());
+ responseHeader.setCode(responseCommand.getCode());
+ return new RpcResponse(responseHeader, responseCommand.getBody());
}
default:{
- RpcResponse rpcResponse = new RpcResponse(responseCommand.getCode(), null, null);
- rpcResponse.setException(new RpcException(responseCommand.getCode(), "unknown remote error"));
+ RpcResponse rpcResponse = new RpcResponse(new RpcException(responseCommand.getCode(), "unknown remote error"));
return rpcResponse;
}
}
@@ -204,12 +198,12 @@ public class RpcClientImpl implements RpcClient {
case ResponseCode.SUCCESS: {
GetEarliestMsgStoretimeResponseHeader responseHeader =
(GetEarliestMsgStoretimeResponseHeader) responseCommand.decodeCommandCustomHeader(GetEarliestMsgStoretimeResponseHeader.class);
- return new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody());
+ responseHeader.setCode(responseCommand.getCode());
+ return new RpcResponse(responseHeader, responseCommand.getBody());
}
default:{
- RpcResponse rpcResponse = new RpcResponse(responseCommand.getCode(), null, null);
- rpcResponse.setException(new RpcException(responseCommand.getCode(), "unknown remote error"));
+ RpcResponse rpcResponse = new RpcResponse(new RpcException(responseCommand.getCode(), "unknown remote error"));
return rpcResponse;
}
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientUtils.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientUtils.java
index 3ae06b7..654b2ed 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientUtils.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientUtils.java
@@ -8,13 +8,13 @@ import java.nio.ByteBuffer;
public class RpcClientUtils {
public static RemotingCommand createCommandForRpcRequest(RpcRequest rpcRequest) {
- RemotingCommand cmd = RemotingCommand.createRequestCommand(rpcRequest.getCode(), rpcRequest.getHeader());
+ RemotingCommand cmd = RemotingCommand.createRequestCommandWithHeader(rpcRequest.getHeader().getCode(), rpcRequest.getHeader());
cmd.setBody(encodeBody(rpcRequest.getBody()));
return cmd;
}
public static RemotingCommand createCommandForRpcResponse(RpcResponse rpcResponse) {
- RemotingCommand cmd = RemotingCommand.createResponseCommand(rpcResponse.getCode(), rpcResponse.getHeader());
+ RemotingCommand cmd = RemotingCommand.createResponseCommandWithHeader(rpcResponse.getCode(), rpcResponse.getHeader());
cmd.setRemark(rpcResponse.getException() == null ? "" : rpcResponse.getException().getMessage());
cmd.setBody(encodeBody(rpcResponse.getBody()));
return cmd;
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMinOffsetResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcHeader.java
similarity index 69%
copy from common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMinOffsetResponseHeader.java
copy to common/src/main/java/org/apache/rocketmq/common/rpc/RpcHeader.java
index 6fc0fac..18b64d5 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMinOffsetResponseHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcHeader.java
@@ -14,29 +14,33 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
-/**
- * $Id: GetMinOffsetResponseHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
- */
-package org.apache.rocketmq.common.protocol.header;
+package org.apache.rocketmq.common.rpc;
import org.apache.rocketmq.remoting.CommandCustomHeader;
-import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-public class GetMinOffsetResponseHeader implements CommandCustomHeader {
- @CFNotNull
- private Long offset;
+public class RpcHeader implements CommandCustomHeader {
- @Override
- public void checkFields() throws RemotingCommandException {
+
+ protected int code;
+
+ public RpcHeader() {
}
- public Long getOffset() {
- return offset;
+ public RpcHeader(int code) {
+ this.code = code;
}
- public void setOffset(Long offset) {
- this.offset = offset;
+ public int getCode() {
+ return code;
+ }
+
+ public void setCode(int code) {
+ this.code = code;
+ }
+
+ @Override
+ public void checkFields() throws RemotingCommandException {
+
}
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequest.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequest.java
index 06d5c79..3ebe3fe 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequest.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequest.java
@@ -17,15 +17,15 @@
package org.apache.rocketmq.common.rpc;
public class RpcRequest {
- private CommonRpcHeader header;
+ private RpcRequestHeader header;
private Object body;
- public RpcRequest(CommonRpcHeader header, Object body) {
+ public RpcRequest(RpcRequestHeader header, Object body) {
this.header = header;
this.body = body;
}
- public CommonRpcHeader getHeader() {
+ public RpcRequestHeader getHeader() {
return header;
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/CommonRpcHeader.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequestHeader.java
similarity index 85%
rename from common/src/main/java/org/apache/rocketmq/common/rpc/CommonRpcHeader.java
rename to common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequestHeader.java
index df62d52..c5c748d 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/CommonRpcHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequestHeader.java
@@ -16,16 +16,11 @@
*/
package org.apache.rocketmq.common.rpc;
-import org.apache.rocketmq.remoting.CommandCustomHeader;
-
-public abstract class CommonRpcHeader implements CommandCustomHeader {
+public abstract class RpcRequestHeader extends RpcHeader {
//the namespace name
protected String namespace;
//if the data has been namespaced
protected Boolean namespaced;
-
- protected int code;
-
//the abstract remote addr name, usually the physical broker name
protected String bname;
@@ -62,12 +57,4 @@ public abstract class CommonRpcHeader implements CommandCustomHeader {
public void setNamespaced(Boolean namespaced) {
this.namespaced = namespaced;
}
-
- public int getCode() {
- return code;
- }
-
- public void setCode(int code) {
- this.code = code;
- }
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcResponse.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcResponse.java
index 601f942..be7cf9b 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcResponse.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcResponse.java
@@ -16,11 +16,8 @@
*/
package org.apache.rocketmq.common.rpc;
-import org.apache.rocketmq.remoting.CommandCustomHeader;
-
public class RpcResponse {
- private int code;
- private CommandCustomHeader header;
+ private RpcHeader header;
private Object body;
public RpcException exception;
@@ -28,29 +25,32 @@ public class RpcResponse {
}
- public RpcResponse(int code, CommandCustomHeader header, byte[] body) {
- this.code = code;
+ public RpcResponse(RpcHeader header, byte[] body) {
this.header = header;
this.body = body;
}
public RpcResponse(RpcException rpcException) {
- this.code = rpcException.getErrorCode();
+ this.header = new RpcHeader(rpcException.getErrorCode());
this.exception = rpcException;
}
- public int getCode() {
- return code;
+ public RpcHeader getHeader() {
+ return header;
}
- public CommandCustomHeader getHeader() {
- return header;
+ public void setHeader(RpcHeader header) {
+ this.header = header;
}
public Object getBody() {
return body;
}
+ public void setBody(Object body) {
+ this.body = body;
+ }
+
public RpcException getException() {
return exception;
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/TopicQueueRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/rpc/TopicQueueRequestHeader.java
index 4b0a394..5d0a151 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/TopicQueueRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/TopicQueueRequestHeader.java
@@ -16,7 +16,7 @@
*/
package org.apache.rocketmq.common.rpc;
-public abstract class TopicQueueRequestHeader extends CommonRpcHeader {
+public abstract class TopicQueueRequestHeader extends RpcRequestHeader {
//Physical or Logical
protected Boolean physical;
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
index 65ffaf5..08bb202 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
@@ -87,7 +87,7 @@ public class RemotingCommand {
}
- public static RemotingCommand createRequestCommand(int code, CommandCustomHeader customHeader) {
+ public static RemotingCommand createRequestCommandWithHeader(int code, CommandCustomHeader customHeader) {
RemotingCommand cmd = new RemotingCommand();
cmd.setCode(code);
cmd.customHeader = customHeader;
@@ -95,7 +95,7 @@ public class RemotingCommand {
return cmd;
}
- public static RemotingCommand createResponseCommand(int code, CommandCustomHeader customHeader) {
+ public static RemotingCommand createResponseCommandWithHeader(int code, CommandCustomHeader customHeader) {
RemotingCommand cmd = new RemotingCommand();
cmd.setCode(code);
cmd.markResponseType();