You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/03/21 04:08:45 UTC

[rocketmq-apis] branch v2 created (now 5cae792)

This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a change to branch v2
in repository https://gitbox.apache.org/repos/asf/rocketmq-apis.git.


      at 5cae792  Use stream telemetry and receive message response

This branch includes the following new commits:

     new 9e15840  Upgrade version to 2.0
     new 5cae792  Use stream telemetry and receive message response

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[rocketmq-apis] 02/02: Use stream telemetry and receive message response

Posted by li...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch v2
in repository https://gitbox.apache.org/repos/asf/rocketmq-apis.git

commit 5cae79245c9bf2b2d28bced8e87af4f960fca429
Author: Li Zhanhui <li...@gmail.com>
AuthorDate: Mon Mar 21 04:03:00 2022 +0000

    Use stream telemetry and receive message response
---
 apache/rocketmq/v2/service.proto | 105 ++++++++++++++-------------------------
 1 file changed, 37 insertions(+), 68 deletions(-)

diff --git a/apache/rocketmq/v2/service.proto b/apache/rocketmq/v2/service.proto
index 896ce41..d3e4505 100644
--- a/apache/rocketmq/v2/service.proto
+++ b/apache/rocketmq/v2/service.proto
@@ -279,59 +279,41 @@ message PullMessageResponse {
   reserved 6 to 64;
 }
 
-message NoopCommand { reserved 1 to 64; }
-
 message PrintThreadStackTraceCommand {
-  string command_id = 1;
-
+  int64 command_id = 1;
   reserved 2 to 64;
 }
 
-message ReportThreadStackTraceRequest {
-  string command_id = 1;
+message ThreadStackTrace {
+  int64 command_id = 1;
   string thread_stack_trace = 2;
 
   reserved 3 to 64;
 }
 
-message ReportThreadStackTraceResponse {
-  ResponseCommon common = 1;
-
-  reserved 2 to 64;
-}
-
-message VerifyMessageConsumptionCommand {
-  string command_id = 1;
+message VerifyMessageCommand {
+  int64 command_id = 1;
   Message message = 2;
 
   reserved 3 to 64;
 }
 
-message ReportMessageConsumptionResultRequest {
-  string command_id = 1;
-
-  // 1. Return `INVALID_ARGUMENT` if message is corrupted.
-  // 2. Return `INTERNAL` if failed to consume message.
-  // 3. Return `OK` if success.
-  google.rpc.Status status = 2;
+message VerifyMessageResult {
+  int64 command_id = 1;
+  ResponseCommon common = 2;
 
   reserved 3 to 64;
 }
 
-message ReportMessageConsumptionResultResponse {
-  ResponseCommon common = 1;
-
-  reserved 2 to 64;
-}
-
 message RecoverOrphanedTransactionCommand {
-  Message orphaned_transactional_message = 1;
-  string transaction_id = 2;
+  int64 command_id = 1;
+  Message orphaned_transactional_message = 2;
+  string transaction_id = 3;
 
-  reserved 3 to 64;
+  reserved 4 to 64;
 }
 
-message PollCommandRequest {
+message Settings {
   string client_id = 1;
   repeated Resource topics = 2;
   oneof group {
@@ -342,19 +324,26 @@ message PollCommandRequest {
   reserved 5 to 64;
 }
 
-message PollCommandResponse {
-  oneof type {
-    // Default command when no new command need to be delivered.
-    NoopCommand noop_command = 1;
+message TelemetryCommand {
+
+  oneof command {
+    Settings settings = 1;
+
+    // Request client to recover the orphaned transaction message.
+    RecoverOrphanedTransactionCommand recover_orphaned_transaction_command = 2;
+
     // Request client to print thread stack trace.
-    PrintThreadStackTraceCommand print_thread_stack_trace_command = 2;
+    PrintThreadStackTraceCommand print_thread_stack_trace_command = 3;
+
+    ThreadStackTrace thread_stack_trace = 4;
+    
     // Request client to verify the consumption of the appointed message.
-    VerifyMessageConsumptionCommand verify_message_consumption_command = 3;
-    // Request client to recover the orphaned transaction message.
-    RecoverOrphanedTransactionCommand recover_orphaned_transaction_command = 4;
+    VerifyMessageCommand verify_message_command = 5;
+
+    VerifyMessageResult verify_message_result = 6;
   }
 
-  reserved 5 to 64;
+  reserved 7 to 64;
 }
 
 message NotifyClientTerminationRequest {
@@ -463,7 +452,7 @@ service MessagingService {
   // or consumer group doesn't exist, returns `NOT_FOUND`. If there is no new
   // message in the specific topic, returns `OK` with an empty message set.
   // Please note that client may suffer from false empty responses.
-  rpc ReceiveMessage(ReceiveMessageRequest) returns (ReceiveMessageResponse) {}
+  rpc ReceiveMessage(ReceiveMessageRequest) returns (stream ReceiveMessageResponse) {}
 
   // Acknowledges the message associated with the `receipt_handle` or `offset`
   // in the `AckMessageRequest`, it means the message has been successfully
@@ -508,33 +497,13 @@ service MessagingService {
   // Please note that client may suffer from false empty responses.
   rpc PullMessage(PullMessageRequest) returns (PullMessageResponse) {}
 
-  // Multiplexing RPC(s) for various polling requests, which issue different
-  // commands to client.
-  //
-  // Sometimes client may need to receive and process the command from server.
-  // To prevent the complexity of streaming RPC(s), a unary RPC using
-  // long-polling is another solution.
-  //
-  // To mark the request-response of corresponding command, `command_id` in
-  // message is recorded in the subsequent RPC(s). For example, after receiving
-  // command of printing thread stack trace, client would send
-  // `ReportMessageConsumptionResultRequest` to server, which contain both of
-  // the stack trace and `command_id`.
-  //
-  // At same time, `NoopCommand` is delivered from server when no new command is
-  // needed, it is essential for client to maintain the ping-pong.
-  //
-  rpc PollCommand(PollCommandRequest) returns (PollCommandResponse) {}
-
-  // After receiving the corresponding polling command, the thread stack trace
-  // is reported to the server.
-  rpc ReportThreadStackTrace(ReportThreadStackTraceRequest)
-      returns (ReportThreadStackTraceResponse) {}
-
-  // After receiving the corresponding polling command, the consumption result
-  // of appointed message is reported to the server.
-  rpc ReportMessageConsumptionResult(ReportMessageConsumptionResultRequest)
-      returns (ReportMessageConsumptionResultResponse) {}
+  // Once a client starts, it would immediately establishes bi-lateral stream RPCs
+  // with brokers, reporting its settings as the initiative command.
+  // 
+  // When servers have need of inspecting client status, they would issue telemetry
+  // commands to clients. After executing recieved instructions, clients shall
+  // report command execution results through client-side streams.
+  rpc Telemetry(stream TelemetryCommand) returns (stream TelemetryCommand) {}
 
   // Notify the server that the client is terminated.
   rpc NotifyClientTermination(NotifyClientTerminationRequest)

[rocketmq-apis] 01/02: Upgrade version to 2.0

Posted by li...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch v2
in repository https://gitbox.apache.org/repos/asf/rocketmq-apis.git

commit 9e158405a18f50e6cd1cc1c47904986b44f67029
Author: Li Zhanhui <li...@gmail.com>
AuthorDate: Fri Mar 18 19:34:25 2022 +0800

    Upgrade version to 2.0
---
 .bazelversion                               |  2 +-
 BUILD.bazel                                 |  4 ++--
 apache/rocketmq/{v1 => v2}/admin.proto      | 13 +++++--------
 apache/rocketmq/{v1 => v2}/definition.proto |  6 +++---
 apache/rocketmq/{v1 => v2}/service.proto    |  8 ++++----
 cpp/BUILD.bazel                             |  2 +-
 java/BUILD.bazel                            |  2 +-
 7 files changed, 17 insertions(+), 20 deletions(-)

diff --git a/.bazelversion b/.bazelversion
index fae6e3d..af8c8ec 100644
--- a/.bazelversion
+++ b/.bazelversion
@@ -1 +1 @@
-4.2.1
+4.2.2
diff --git a/BUILD.bazel b/BUILD.bazel
index f4c1f9b..a9d1cb6 100644
--- a/BUILD.bazel
+++ b/BUILD.bazel
@@ -1,8 +1,8 @@
 package(default_visibility = ["//visibility:public"])
 
 proto_library(
-    name = "rocketmq_v1_proto",
-    srcs = glob(["apache/rocketmq/v1/*.proto"]),
+    name = "rocketmq_v2_proto",
+    srcs = glob(["apache/rocketmq/v2/*.proto"]),
     deps = [
         "@com_google_protobuf//:empty_proto",
         "@com_google_protobuf//:field_mask_proto",
diff --git a/apache/rocketmq/v1/admin.proto b/apache/rocketmq/v2/admin.proto
similarity index 86%
rename from apache/rocketmq/v1/admin.proto
rename to apache/rocketmq/v2/admin.proto
index 283ca82..7dbb702 100644
--- a/apache/rocketmq/v1/admin.proto
+++ b/apache/rocketmq/v2/admin.proto
@@ -15,12 +15,12 @@
 
 syntax = "proto3";
 
-package apache.rocketmq.v1;
+package apache.rocketmq.v2;
 
 option cc_enable_arenas = true;
-option csharp_namespace = "Apache.Rocketmq.V1";
+option csharp_namespace = "Apache.Rocketmq.V2";
 option java_multiple_files = true;
-option java_package = "apache.rocketmq.v1";
+option java_package = "apache.rocketmq.v2";
 option java_generate_equals_and_hash = true;
 option java_string_check_utf8 = true;
 option java_outer_classname = "MQAdmin";
@@ -36,11 +36,8 @@ message ChangeLogLevelRequest {
   Level level = 1;
 }
 
-message ChangeLogLevelResponse {
-  string remark = 1;
-}
+message ChangeLogLevelResponse { string remark = 1; }
 
 service Admin {
-  rpc ChangeLogLevel(ChangeLogLevelRequest) returns (ChangeLogLevelResponse) {
-  }
+  rpc ChangeLogLevel(ChangeLogLevelRequest) returns (ChangeLogLevelResponse) {}
 }
\ No newline at end of file
diff --git a/apache/rocketmq/v1/definition.proto b/apache/rocketmq/v2/definition.proto
similarity index 98%
rename from apache/rocketmq/v1/definition.proto
rename to apache/rocketmq/v2/definition.proto
index 723c1e3..5e2d948 100644
--- a/apache/rocketmq/v1/definition.proto
+++ b/apache/rocketmq/v2/definition.proto
@@ -18,11 +18,11 @@ syntax = "proto3";
 import "google/protobuf/timestamp.proto";
 import "google/protobuf/duration.proto";
 
-package apache.rocketmq.v1;
+package apache.rocketmq.v2;
 
-option csharp_namespace = "Apache.Rocketmq.V1";
+option csharp_namespace = "Apache.Rocketmq.V2";
 option java_multiple_files = true;
-option java_package = "apache.rocketmq.v1";
+option java_package = "apache.rocketmq.v2";
 option java_generate_equals_and_hash = true;
 option java_string_check_utf8 = true;
 option java_outer_classname = "MQDomain";
diff --git a/apache/rocketmq/v1/service.proto b/apache/rocketmq/v2/service.proto
similarity index 99%
rename from apache/rocketmq/v1/service.proto
rename to apache/rocketmq/v2/service.proto
index 185eddf..896ce41 100644
--- a/apache/rocketmq/v1/service.proto
+++ b/apache/rocketmq/v2/service.proto
@@ -20,13 +20,13 @@ import "google/protobuf/timestamp.proto";
 import "google/rpc/error_details.proto";
 import "google/rpc/status.proto";
 
-import "apache/rocketmq/v1/definition.proto";
+import "apache/rocketmq/v2/definition.proto";
 
-package apache.rocketmq.v1;
+package apache.rocketmq.v2;
 
-option csharp_namespace = "Apache.Rocketmq.V1";
+option csharp_namespace = "Apache.Rocketmq.V2";
 option java_multiple_files = true;
-option java_package = "apache.rocketmq.v1";
+option java_package = "apache.rocketmq.v2";
 option java_generate_equals_and_hash = true;
 option java_string_check_utf8 = true;
 option java_outer_classname = "MQService";
diff --git a/cpp/BUILD.bazel b/cpp/BUILD.bazel
index 8c03db4..5a0726b 100644
--- a/cpp/BUILD.bazel
+++ b/cpp/BUILD.bazel
@@ -26,5 +26,5 @@ proto_plugin(
 
 cpp_grpc_library(
     name = "rocketmq-proto-cpp-library",
-    deps = ["//:rocketmq_v1_proto"]
+    deps = ["//:rocketmq_v2_proto"]
 )
\ No newline at end of file
diff --git a/java/BUILD.bazel b/java/BUILD.bazel
index 95cf665..93daa4c 100644
--- a/java/BUILD.bazel
+++ b/java/BUILD.bazel
@@ -20,7 +20,7 @@ proto_plugin(
 java_grpc_compile(
     name = "rocketmq_proto_src",
     deps = [
-        "//:rocketmq_v1_proto",
+        "//:rocketmq_v2_proto",
     ]
 )