You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/03/21 04:08:47 UTC
[rocketmq-apis] 02/02: Use stream telemetry and receive message response
This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch v2
in repository https://gitbox.apache.org/repos/asf/rocketmq-apis.git
commit 5cae79245c9bf2b2d28bced8e87af4f960fca429
Author: Li Zhanhui <li...@gmail.com>
AuthorDate: Mon Mar 21 04:03:00 2022 +0000
Use stream telemetry and receive message response
---
apache/rocketmq/v2/service.proto | 105 ++++++++++++++-------------------------
1 file changed, 37 insertions(+), 68 deletions(-)
diff --git a/apache/rocketmq/v2/service.proto b/apache/rocketmq/v2/service.proto
index 896ce41..d3e4505 100644
--- a/apache/rocketmq/v2/service.proto
+++ b/apache/rocketmq/v2/service.proto
@@ -279,59 +279,41 @@ message PullMessageResponse {
reserved 6 to 64;
}
-message NoopCommand { reserved 1 to 64; }
-
message PrintThreadStackTraceCommand {
- string command_id = 1;
-
+ int64 command_id = 1;
reserved 2 to 64;
}
-message ReportThreadStackTraceRequest {
- string command_id = 1;
+message ThreadStackTrace {
+ int64 command_id = 1;
string thread_stack_trace = 2;
reserved 3 to 64;
}
-message ReportThreadStackTraceResponse {
- ResponseCommon common = 1;
-
- reserved 2 to 64;
-}
-
-message VerifyMessageConsumptionCommand {
- string command_id = 1;
+message VerifyMessageCommand {
+ int64 command_id = 1;
Message message = 2;
reserved 3 to 64;
}
-message ReportMessageConsumptionResultRequest {
- string command_id = 1;
-
- // 1. Return `INVALID_ARGUMENT` if message is corrupted.
- // 2. Return `INTERNAL` if failed to consume message.
- // 3. Return `OK` if success.
- google.rpc.Status status = 2;
+message VerifyMessageResult {
+ int64 command_id = 1;
+ ResponseCommon common = 2;
reserved 3 to 64;
}
-message ReportMessageConsumptionResultResponse {
- ResponseCommon common = 1;
-
- reserved 2 to 64;
-}
-
message RecoverOrphanedTransactionCommand {
- Message orphaned_transactional_message = 1;
- string transaction_id = 2;
+ int64 command_id = 1;
+ Message orphaned_transactional_message = 2;
+ string transaction_id = 3;
- reserved 3 to 64;
+ reserved 4 to 64;
}
-message PollCommandRequest {
+message Settings {
string client_id = 1;
repeated Resource topics = 2;
oneof group {
@@ -342,19 +324,26 @@ message PollCommandRequest {
reserved 5 to 64;
}
-message PollCommandResponse {
- oneof type {
- // Default command when no new command need to be delivered.
- NoopCommand noop_command = 1;
+message TelemetryCommand {
+
+ oneof command {
+ Settings settings = 1;
+
+ // Request client to recover the orphaned transaction message.
+ RecoverOrphanedTransactionCommand recover_orphaned_transaction_command = 2;
+
// Request client to print thread stack trace.
- PrintThreadStackTraceCommand print_thread_stack_trace_command = 2;
+ PrintThreadStackTraceCommand print_thread_stack_trace_command = 3;
+
+ ThreadStackTrace thread_stack_trace = 4;
+
// Request client to verify the consumption of the appointed message.
- VerifyMessageConsumptionCommand verify_message_consumption_command = 3;
- // Request client to recover the orphaned transaction message.
- RecoverOrphanedTransactionCommand recover_orphaned_transaction_command = 4;
+ VerifyMessageCommand verify_message_command = 5;
+
+ VerifyMessageResult verify_message_result = 6;
}
- reserved 5 to 64;
+ reserved 7 to 64;
}
message NotifyClientTerminationRequest {
@@ -463,7 +452,7 @@ service MessagingService {
// or consumer group doesn't exist, returns `NOT_FOUND`. If there is no new
// message in the specific topic, returns `OK` with an empty message set.
// Please note that client may suffer from false empty responses.
- rpc ReceiveMessage(ReceiveMessageRequest) returns (ReceiveMessageResponse) {}
+ rpc ReceiveMessage(ReceiveMessageRequest) returns (stream ReceiveMessageResponse) {}
// Acknowledges the message associated with the `receipt_handle` or `offset`
// in the `AckMessageRequest`, it means the message has been successfully
@@ -508,33 +497,13 @@ service MessagingService {
// Please note that client may suffer from false empty responses.
rpc PullMessage(PullMessageRequest) returns (PullMessageResponse) {}
- // Multiplexing RPC(s) for various polling requests, which issue different
- // commands to client.
- //
- // Sometimes client may need to receive and process the command from server.
- // To prevent the complexity of streaming RPC(s), a unary RPC using
- // long-polling is another solution.
- //
- // To mark the request-response of corresponding command, `command_id` in
- // message is recorded in the subsequent RPC(s). For example, after receiving
- // command of printing thread stack trace, client would send
- // `ReportMessageConsumptionResultRequest` to server, which contain both of
- // the stack trace and `command_id`.
- //
- // At same time, `NoopCommand` is delivered from server when no new command is
- // needed, it is essential for client to maintain the ping-pong.
- //
- rpc PollCommand(PollCommandRequest) returns (PollCommandResponse) {}
-
- // After receiving the corresponding polling command, the thread stack trace
- // is reported to the server.
- rpc ReportThreadStackTrace(ReportThreadStackTraceRequest)
- returns (ReportThreadStackTraceResponse) {}
-
- // After receiving the corresponding polling command, the consumption result
- // of appointed message is reported to the server.
- rpc ReportMessageConsumptionResult(ReportMessageConsumptionResultRequest)
- returns (ReportMessageConsumptionResultResponse) {}
+ // Once a client starts, it would immediately establishes bi-lateral stream RPCs
+ // with brokers, reporting its settings as the initiative command.
+ //
+ // When servers have need of inspecting client status, they would issue telemetry
+ // commands to clients. After executing recieved instructions, clients shall
+ // report command execution results through client-side streams.
+ rpc Telemetry(stream TelemetryCommand) returns (stream TelemetryCommand) {}
// Notify the server that the client is terminated.
rpc NotifyClientTermination(NotifyClientTerminationRequest)