You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/03/21 04:08:47 UTC

[rocketmq-apis] 02/02: Use stream telemetry and receive message response

This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch v2
in repository https://gitbox.apache.org/repos/asf/rocketmq-apis.git

commit 5cae79245c9bf2b2d28bced8e87af4f960fca429
Author: Li Zhanhui <li...@gmail.com>
AuthorDate: Mon Mar 21 04:03:00 2022 +0000

    Use stream telemetry and receive message response
---
 apache/rocketmq/v2/service.proto | 105 ++++++++++++++-------------------------
 1 file changed, 37 insertions(+), 68 deletions(-)

diff --git a/apache/rocketmq/v2/service.proto b/apache/rocketmq/v2/service.proto
index 896ce41..d3e4505 100644
--- a/apache/rocketmq/v2/service.proto
+++ b/apache/rocketmq/v2/service.proto
@@ -279,59 +279,41 @@ message PullMessageResponse {
   reserved 6 to 64;
 }
 
-message NoopCommand { reserved 1 to 64; }
-
 message PrintThreadStackTraceCommand {
-  string command_id = 1;
-
+  int64 command_id = 1;
   reserved 2 to 64;
 }
 
-message ReportThreadStackTraceRequest {
-  string command_id = 1;
+message ThreadStackTrace {
+  int64 command_id = 1;
   string thread_stack_trace = 2;
 
   reserved 3 to 64;
 }
 
-message ReportThreadStackTraceResponse {
-  ResponseCommon common = 1;
-
-  reserved 2 to 64;
-}
-
-message VerifyMessageConsumptionCommand {
-  string command_id = 1;
+message VerifyMessageCommand {
+  int64 command_id = 1;
   Message message = 2;
 
   reserved 3 to 64;
 }
 
-message ReportMessageConsumptionResultRequest {
-  string command_id = 1;
-
-  // 1. Return `INVALID_ARGUMENT` if message is corrupted.
-  // 2. Return `INTERNAL` if failed to consume message.
-  // 3. Return `OK` if success.
-  google.rpc.Status status = 2;
+message VerifyMessageResult {
+  int64 command_id = 1;
+  ResponseCommon common = 2;
 
   reserved 3 to 64;
 }
 
-message ReportMessageConsumptionResultResponse {
-  ResponseCommon common = 1;
-
-  reserved 2 to 64;
-}
-
 message RecoverOrphanedTransactionCommand {
-  Message orphaned_transactional_message = 1;
-  string transaction_id = 2;
+  int64 command_id = 1;
+  Message orphaned_transactional_message = 2;
+  string transaction_id = 3;
 
-  reserved 3 to 64;
+  reserved 4 to 64;
 }
 
-message PollCommandRequest {
+message Settings {
   string client_id = 1;
   repeated Resource topics = 2;
   oneof group {
@@ -342,19 +324,26 @@ message PollCommandRequest {
   reserved 5 to 64;
 }
 
-message PollCommandResponse {
-  oneof type {
-    // Default command when no new command need to be delivered.
-    NoopCommand noop_command = 1;
+message TelemetryCommand {
+
+  oneof command {
+    Settings settings = 1;
+
+    // Request client to recover the orphaned transaction message.
+    RecoverOrphanedTransactionCommand recover_orphaned_transaction_command = 2;
+
     // Request client to print thread stack trace.
-    PrintThreadStackTraceCommand print_thread_stack_trace_command = 2;
+    PrintThreadStackTraceCommand print_thread_stack_trace_command = 3;
+
+    ThreadStackTrace thread_stack_trace = 4;
+    
     // Request client to verify the consumption of the appointed message.
-    VerifyMessageConsumptionCommand verify_message_consumption_command = 3;
-    // Request client to recover the orphaned transaction message.
-    RecoverOrphanedTransactionCommand recover_orphaned_transaction_command = 4;
+    VerifyMessageCommand verify_message_command = 5;
+
+    VerifyMessageResult verify_message_result = 6;
   }
 
-  reserved 5 to 64;
+  reserved 7 to 64;
 }
 
 message NotifyClientTerminationRequest {
@@ -463,7 +452,7 @@ service MessagingService {
   // or consumer group doesn't exist, returns `NOT_FOUND`. If there is no new
   // message in the specific topic, returns `OK` with an empty message set.
   // Please note that client may suffer from false empty responses.
-  rpc ReceiveMessage(ReceiveMessageRequest) returns (ReceiveMessageResponse) {}
+  rpc ReceiveMessage(ReceiveMessageRequest) returns (stream ReceiveMessageResponse) {}
 
   // Acknowledges the message associated with the `receipt_handle` or `offset`
   // in the `AckMessageRequest`, it means the message has been successfully
@@ -508,33 +497,13 @@ service MessagingService {
   // Please note that client may suffer from false empty responses.
   rpc PullMessage(PullMessageRequest) returns (PullMessageResponse) {}
 
-  // Multiplexing RPC(s) for various polling requests, which issue different
-  // commands to client.
-  //
-  // Sometimes client may need to receive and process the command from server.
-  // To prevent the complexity of streaming RPC(s), a unary RPC using
-  // long-polling is another solution.
-  //
-  // To mark the request-response of corresponding command, `command_id` in
-  // message is recorded in the subsequent RPC(s). For example, after receiving
-  // command of printing thread stack trace, client would send
-  // `ReportMessageConsumptionResultRequest` to server, which contain both of
-  // the stack trace and `command_id`.
-  //
-  // At same time, `NoopCommand` is delivered from server when no new command is
-  // needed, it is essential for client to maintain the ping-pong.
-  //
-  rpc PollCommand(PollCommandRequest) returns (PollCommandResponse) {}
-
-  // After receiving the corresponding polling command, the thread stack trace
-  // is reported to the server.
-  rpc ReportThreadStackTrace(ReportThreadStackTraceRequest)
-      returns (ReportThreadStackTraceResponse) {}
-
-  // After receiving the corresponding polling command, the consumption result
-  // of appointed message is reported to the server.
-  rpc ReportMessageConsumptionResult(ReportMessageConsumptionResultRequest)
-      returns (ReportMessageConsumptionResultResponse) {}
+  // Once a client starts, it would immediately establishes bi-lateral stream RPCs
+  // with brokers, reporting its settings as the initiative command.
+  // 
+  // When servers have need of inspecting client status, they would issue telemetry
+  // commands to clients. After executing recieved instructions, clients shall
+  // report command execution results through client-side streams.
+  rpc Telemetry(stream TelemetryCommand) returns (stream TelemetryCommand) {}
 
   // Notify the server that the client is terminated.
   rpc NotifyClientTermination(NotifyClientTerminationRequest)