You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2023/03/14 11:06:10 UTC

[rocketmq-apis] branch main updated: WIP: support pull consumer (#67)

This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-apis.git


The following commit(s) were added to refs/heads/main by this push:
     new 6d5d7e0  WIP: support pull consumer (#67)
6d5d7e0 is described below

commit 6d5d7e00273e78bceb04fc10b1439f2e8d4c22f6
Author: Aaron Ai <ya...@alibaba-inc.com>
AuthorDate: Tue Mar 14 19:06:04 2023 +0800

    WIP: support pull consumer (#67)
    
    * WIP: support pull consumer
    
    * Remove redundant QueryOffsetByGroupRequest/QueryOffsetByGroupResponse
---
 apache/rocketmq/v2/definition.proto | 11 ++++++++
 apache/rocketmq/v2/service.proto    | 56 +++++++++++++++++++++++++++++++++++++
 2 files changed, 67 insertions(+)

diff --git a/apache/rocketmq/v2/definition.proto b/apache/rocketmq/v2/definition.proto
index d10418d..e8bb655 100644
--- a/apache/rocketmq/v2/definition.proto
+++ b/apache/rocketmq/v2/definition.proto
@@ -549,4 +549,15 @@ message Metric {
 
   // The endpoint that client metrics should be exported to, which is required if the switch is on.
   optional Endpoints endpoints = 2;
+}
+
+enum QueryOffsetPolicy {
+  // Use this option if client wishes to playback all existing messages.
+  BEGINNING = 0;
+
+  // Use this option if client wishes to skip all existing messages.
+  END = 1;
+
+  // Use this option if time-based seek is targeted.
+  TIMESTAMP = 2;
 }
\ No newline at end of file
diff --git a/apache/rocketmq/v2/service.proto b/apache/rocketmq/v2/service.proto
index 6d203d4..f662f76 100644
--- a/apache/rocketmq/v2/service.proto
+++ b/apache/rocketmq/v2/service.proto
@@ -244,6 +244,54 @@ message ChangeInvisibleDurationResponse {
   string receipt_handle = 2;
 }
 
+message PullMessageRequest {
+  Resource group = 1;
+  MessageQueue message_queue = 2;
+  int64 offset = 3;
+  int32 batch_size = 4;
+  FilterExpression filter_expression = 5;
+  google.protobuf.Duration long_polling_timeout = 6;
+}
+
+message PullMessageResponse {
+  oneof content {
+    Status status = 1;
+    Message message = 2;
+    int64 next_offset = 3;
+  }
+}
+
+message UpdateOffsetRequest {
+  Resource group = 1;
+  MessageQueue message_queue = 2;
+  int64 offset = 3;
+}
+
+message UpdateOffsetResponse {
+  Status status = 1;
+}
+
+message GetOffsetRequest {
+  Resource group = 1;
+  MessageQueue message_queue = 2;
+}
+
+message GetOffsetResponse {
+  Status status = 1;
+  int64 offset = 2;
+}
+
+message QueryOffsetRequest {
+  MessageQueue message_queue = 1;
+  QueryOffsetPolicy query_offset_policy = 2;
+  optional google.protobuf.Timestamp timestamp = 3;
+}
+
+message QueryOffsetResponse {
+  Status status = 1;
+  int64 offset = 2;
+}
+
 // For all the RPCs in MessagingService, the following error handling policies
 // apply:
 //
@@ -329,6 +377,14 @@ service MessagingService {
   rpc ForwardMessageToDeadLetterQueue(ForwardMessageToDeadLetterQueueRequest)
       returns (ForwardMessageToDeadLetterQueueResponse) {}
 
+  rpc PullMessage(PullMessageRequest) returns (stream PullMessageResponse) {}
+
+  rpc UpdateOffset(UpdateOffsetRequest) returns (UpdateOffsetResponse) {}
+
+  rpc GetOffset(GetOffsetRequest) returns (GetOffsetResponse) {}
+
+  rpc QueryOffset(QueryOffsetRequest) returns (QueryOffsetResponse) {}
+
   // Commits or rollback one transactional message.
   rpc EndTransaction(EndTransactionRequest) returns (EndTransactionResponse) {}