You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2021/10/09 08:05:18 UTC
[rocketmq-apis] branch main updated: Extract PrintThreadStackResponse and VerifyMessageConsumptionResponse… (#5)
This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-apis.git
The following commit(s) were added to refs/heads/main by this push:
new a875b2d Extract PrintThreadStackResponse and VerifyMessageConsumptionResponse… (#5)
a875b2d is described below
commit a875b2df5bcd00c321bf3814ff251527bafbbb09
Author: aaron ai <ya...@gmail.com>
AuthorDate: Sat Oct 9 16:05:12 2021 +0800
Extract PrintThreadStackResponse and VerifyMessageConsumptionResponse… (#5)
Split Multiplexing RPC into multiple RPCs and re-purpose them as follows:
1. Poll commands from servers;
2. Once a command is fetched, dispatch it to executors and re-initiate a new polling cycle for more commands;
3. When a command is executed, use corresponding RPC to report results back to servers.
---
apache/rocketmq/v1/service.proto | 102 ++++++++++++++++++++++++---------------
1 file changed, 64 insertions(+), 38 deletions(-)
diff --git a/apache/rocketmq/v1/service.proto b/apache/rocketmq/v1/service.proto
index 1da4883..54b4217 100644
--- a/apache/rocketmq/v1/service.proto
+++ b/apache/rocketmq/v1/service.proto
@@ -264,74 +264,73 @@ message PullMessageResponse {
reserved 6 to 64;
}
-message GenericPollingRequest {
- string client_id = 1;
- repeated Resource topics = 2;
- oneof group {
- Resource producer_group = 3;
- Resource consumer_group = 4;
- }
+message NoopCommand { reserved 1 to 64; }
- reserved 5 to 64;
-}
-
-message GenericPollingResponse {
- ResponseCommon common = 1;
+message PrintThreadStackTraceCommand {
+ string command_id = 1;
reserved 2 to 64;
}
-message PrintThreadStackRequest {
- string mid = 1;
+message ReportThreadStackTraceRequest {
+ string command_id = 1;
+ string thread_stack_trace = 2;
- reserved 2 to 64;
+ reserved 3 to 64;
}
-message PrintThreadStackResponse {
+message ReportThreadStackTraceResponse {
ResponseCommon common = 1;
- string mid = 2;
- string stack_trace = 3;
- reserved 4 to 64;
+ reserved 2 to 64;
}
-message VerifyMessageConsumptionRequest {
- string mid = 1;
+message VerifyMessageConsumptionCommand {
+ string command_id = 1;
Message message = 2;
reserved 3 to 64;
}
-message VerifyMessageConsumptionResponse {
- string mid = 1;
- ResponseCommon common = 2;
+message ReportMessageConsumptionResultRequest {
+ string command_id = 1;
+ google.rpc.Status status = 2;
+}
- reserved 3 to 64;
+message ReportMessageConsumptionResultResponse {
+ ResponseCommon common = 1;
+
+ reserved 2 to 64;
}
-message RecoverOrphanedTransactionRequest {
+message RecoverOrphanedTransactionCommand {
Message orphaned_transactional_message = 1;
string transaction_id = 2;
reserved 3 to 64;
}
-message MultiplexingRequest {
- oneof type {
- GenericPollingRequest polling_request = 1;
- PrintThreadStackResponse print_thread_stack_response = 2;
- VerifyMessageConsumptionResponse verify_message_consumption_response = 3;
+message PollCommandRequest {
+ string client_id = 1;
+ repeated Resource topics = 2;
+ oneof group {
+ Resource producer_group = 3;
+ Resource consumer_group = 4;
}
- reserved 4 to 64;
+ reserved 5 to 64;
}
-message MultiplexingResponse {
+message PollCommandResponse {
oneof type {
- GenericPollingResponse polling_response = 1;
- PrintThreadStackRequest print_thread_stack_request = 2;
- VerifyMessageConsumptionRequest verify_message_consumption_request = 3;
- RecoverOrphanedTransactionRequest recover_orphaned_transaction_request = 4;
+ // Default command when no new command need to be delivered.
+ NoopCommand noop_command = 1;
+ // Request client to print thread stack trace.
+ PrintThreadStackTraceCommand print_thread_stack_trace_command = 2;
+ // Request client to verify the consumption of the appointed message.
+ VerifyMessageConsumptionCommand verify_message_consumption_command = 3;
+ // Request client to recover the orphaned transaction message.
+ RecoverOrphanedTransactionCommand recover_orphaned_transaction_command = 4;
}
reserved 5 to 64;
@@ -435,8 +434,35 @@ service MessagingService {
// may suffer from false empty responses.
rpc PullMessage(PullMessageRequest) returns (PullMessageResponse) {}
- rpc MultiplexingCall(MultiplexingRequest) returns (MultiplexingResponse) {}
+ // Multiplexing RPC(s) for various polling requests, which issue different
+ // commands to client.
+ //
+ // Sometimes client may need to receive and process the command from server.
+ // To prevent the complexity of streaming RPC(s), a unary RPC using
+ // long-polling is another solution.
+ //
+ // To mark the request-response of corresponding command, `command_id` in
+ // message is recorded in the subsequent RPC(s). For example, after receiving
+ // command of printing thread stack trace, client would send
+ // `ReportMessageConsumptionResultRequest` to server, which contain both of
+ // the stack trace and `command_id`.
+ //
+ // At same time, `NoopCommand` is delivered from server when no new command is
+ // needed, it is essential for client to maintain the ping-pong.
+ //
+ rpc PollCommand(PollCommandRequest) returns (PollCommandResponse) {}
+
+ // After receiving the corresponding polling command, the thread stack trace
+ // is reported to the server.
+ rpc ReportThreadStackTrace(ReportThreadStackTraceRequest)
+ returns (ReportThreadStackTraceResponse) {}
+
+ // After receiving the corresponding polling command, the consumption result
+ // of appointed message is reported to the server.
+ rpc ReportMessageConsumptionResult(ReportMessageConsumptionResultRequest)
+ returns (ReportMessageConsumptionResultResponse) {}
+ // Notify the server that the client is terminated.
rpc NotifyClientTermination(NotifyClientTerminationRequest)
returns (NotifyClientTerminationResponse) {}
}
\ No newline at end of file