You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2021/10/09 08:05:18 UTC

[rocketmq-apis] branch main updated: Extract PrintThreadStackResponse and VerifyMessageConsumptionResponse… (#5)

This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-apis.git


The following commit(s) were added to refs/heads/main by this push:
     new a875b2d  Extract PrintThreadStackResponse and VerifyMessageConsumptionResponse… (#5)
a875b2d is described below

commit a875b2df5bcd00c321bf3814ff251527bafbbb09
Author: aaron ai <ya...@gmail.com>
AuthorDate: Sat Oct 9 16:05:12 2021 +0800

    Extract PrintThreadStackResponse and VerifyMessageConsumptionResponse… (#5)
    
    Split Multiplexing RPC into multiple RPCs and re-purpose them as follows:
    1. Poll commands from servers;
    2. Once a command is fetched, dispatch it to executors and re-initiate a new polling cycle for more commands;
    3. When a command is executed, use corresponding RPC to report results back to servers.
---
 apache/rocketmq/v1/service.proto | 102 ++++++++++++++++++++++++---------------
 1 file changed, 64 insertions(+), 38 deletions(-)

diff --git a/apache/rocketmq/v1/service.proto b/apache/rocketmq/v1/service.proto
index 1da4883..54b4217 100644
--- a/apache/rocketmq/v1/service.proto
+++ b/apache/rocketmq/v1/service.proto
@@ -264,74 +264,73 @@ message PullMessageResponse {
   reserved 6 to 64;
 }
 
-message GenericPollingRequest {
-  string client_id = 1;
-  repeated Resource topics = 2;
-  oneof group {
-    Resource producer_group = 3;
-    Resource consumer_group = 4;
-  }
+message NoopCommand { reserved 1 to 64; }
 
-  reserved 5 to 64;
-}
-
-message GenericPollingResponse {
-  ResponseCommon common = 1;
+message PrintThreadStackTraceCommand {
+  string command_id = 1;
 
   reserved 2 to 64;
 }
 
-message PrintThreadStackRequest {
-  string mid = 1;
+message ReportThreadStackTraceRequest {
+  string command_id = 1;
+  string thread_stack_trace = 2;
 
-  reserved 2 to 64;
+  reserved 3 to 64;
 }
 
-message PrintThreadStackResponse {
+message ReportThreadStackTraceResponse {
   ResponseCommon common = 1;
-  string mid = 2;
-  string stack_trace = 3;
 
-  reserved 4 to 64;
+  reserved 2 to 64;
 }
 
-message VerifyMessageConsumptionRequest {
-  string mid = 1;
+message VerifyMessageConsumptionCommand {
+  string command_id = 1;
   Message message = 2;
 
   reserved 3 to 64;
 }
 
-message VerifyMessageConsumptionResponse {
-  string mid = 1;
-  ResponseCommon common = 2;
+message ReportMessageConsumptionResultRequest {
+  string command_id = 1;
+  google.rpc.Status status = 2;
+}
 
-  reserved 3 to 64;
+message ReportMessageConsumptionResultResponse {
+  ResponseCommon common = 1;
+
+  reserved 2 to 64;
 }
 
-message RecoverOrphanedTransactionRequest {
+message RecoverOrphanedTransactionCommand {
   Message orphaned_transactional_message = 1;
   string transaction_id = 2;
 
   reserved 3 to 64;
 }
 
-message MultiplexingRequest {
-  oneof type {
-    GenericPollingRequest polling_request = 1;
-    PrintThreadStackResponse print_thread_stack_response = 2;
-    VerifyMessageConsumptionResponse verify_message_consumption_response = 3;
+message PollCommandRequest {
+  string client_id = 1;
+  repeated Resource topics = 2;
+  oneof group {
+    Resource producer_group = 3;
+    Resource consumer_group = 4;
   }
 
-  reserved 4 to 64;
+  reserved 5 to 64;
 }
 
-message MultiplexingResponse {
+message PollCommandResponse {
   oneof type {
-    GenericPollingResponse polling_response = 1;
-    PrintThreadStackRequest print_thread_stack_request = 2;
-    VerifyMessageConsumptionRequest verify_message_consumption_request = 3;
-    RecoverOrphanedTransactionRequest recover_orphaned_transaction_request = 4;
+    // Default command when no new command need to be delivered.
+    NoopCommand noop_command = 1;
+    // Request client to print thread stack trace.
+    PrintThreadStackTraceCommand print_thread_stack_trace_command = 2;
+    // Request client to verify the consumption of the appointed message.
+    VerifyMessageConsumptionCommand verify_message_consumption_command = 3;
+    // Request client to recover the orphaned transaction message.
+    RecoverOrphanedTransactionCommand recover_orphaned_transaction_command = 4;
   }
 
   reserved 5 to 64;
@@ -435,8 +434,35 @@ service MessagingService {
   // may suffer from false empty responses.
   rpc PullMessage(PullMessageRequest) returns (PullMessageResponse) {}
 
-  rpc MultiplexingCall(MultiplexingRequest) returns (MultiplexingResponse) {}
+  // Multiplexing RPC(s) for various polling requests, which issue different
+  // commands to client.
+  //
+  // Sometimes client may need to receive and process the command from server.
+  // To prevent the complexity of streaming RPC(s), a unary RPC using
+  // long-polling is another solution.
+  //
+  // To mark the request-response of corresponding command, `command_id` in
+  // message is recorded in the subsequent RPC(s). For example, after receiving
+  // command of printing thread stack trace, client would send
+  // `ReportMessageConsumptionResultRequest` to server, which contain both of
+  // the stack trace and `command_id`.
+  //
+  // At same time, `NoopCommand` is delivered from server when no new command is
+  // needed, it is essential for client to maintain the ping-pong.
+  //
+  rpc PollCommand(PollCommandRequest) returns (PollCommandResponse) {}
+
+  // After receiving the corresponding polling command, the thread stack trace
+  // is reported to the server.
+  rpc ReportThreadStackTrace(ReportThreadStackTraceRequest)
+      returns (ReportThreadStackTraceResponse) {}
+
+  // After receiving the corresponding polling command, the consumption result
+  // of appointed message is reported to the server.
+  rpc ReportMessageConsumptionResult(ReportMessageConsumptionResultRequest)
+      returns (ReportMessageConsumptionResultResponse) {}
 
+  // Notify the server that the client is terminated.
   rpc NotifyClientTermination(NotifyClientTerminationRequest)
       returns (NotifyClientTerminationResponse) {}
 }
\ No newline at end of file