You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/07/29 06:19:35 UTC
[rocketmq-clients] 01/01: Set up Rust boilerplate code
This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch rust_dev
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
commit 299a75486bafbf3b62c9ee8799ed100c46f83c1b
Author: Li Zhanhui <li...@gmail.com>
AuthorDate: Mon Jul 4 19:04:14 2022 +0800
Set up Rust boilerplate code
---
.gitignore | 7 +
rust/Cargo.toml | 21 ++
rust/build.rs | 14 +
rust/proto/apache/rocketmq/v2/admin.proto | 43 +++
rust/proto/apache/rocketmq/v2/definition.proto | 450 ++++++++++++++++++++++++
rust/proto/apache/rocketmq/v2/service.proto | 455 +++++++++++++++++++++++++
rust/src/client.rs | 410 ++++++++++++++++++++++
rust/src/command.rs | 13 +
rust/src/error.rs | 26 ++
rust/src/lib.rs | 26 ++
rust/src/main.rs | 20 ++
rust/src/pb/README.md | 5 +
rust/src/producer.rs | 25 ++
13 files changed, 1515 insertions(+)
diff --git a/.gitignore b/.gitignore
index 68a6ad8..1ea2e09 100644
--- a/.gitignore
+++ b/.gitignore
@@ -27,3 +27,10 @@ dependency-reduced-pom.xml
# C#
obj/
+# Java
+java/client/src/main/java/org/apache/rocketmq/client/java/example/
+
+# Rust
+rust/target
+rust/Cargo.lock
+rust/src/pb/*.rs
\ No newline at end of file
diff --git a/rust/Cargo.toml b/rust/Cargo.toml
new file mode 100644
index 0000000..b290d64
--- /dev/null
+++ b/rust/Cargo.toml
@@ -0,0 +1,21 @@
+[package]
+name = "rocketmq"
+version = "0.1.0"
+edition = "2021"
+
+[dependencies]
+tokio = { version = "1", features = ["full"] }
+tonic = {version = "0.7", features = ["tls", "default", "channel", "tls-roots"]}
+prost = "0.10"
+prost-types = "0.10"
+thiserror = "1.0"
+slog = {version = "2.7.0", features=["max_level_trace", "release_max_level_info"]}
+slog-term = "2.9.0"
+slog-async = "2.7.0"
+parking_lot = "0.12"
+hmac = "0.12"
+hostname = "0.3.1"
+tokio-rustls = {version = "0.23", features = ["default", "dangerous_configuration"] }
+
+[build-dependencies]
+tonic-build = "0.7"
\ No newline at end of file
diff --git a/rust/build.rs b/rust/build.rs
new file mode 100644
index 0000000..eebab8a
--- /dev/null
+++ b/rust/build.rs
@@ -0,0 +1,14 @@
+fn main() {
+ tonic_build::configure()
+ .build_client(true)
+ .build_server(false)
+ .out_dir("src/pb")
+ .compile(
+ &[
+ "proto/apache/rocketmq/v2/service.proto",
+ "proto/apache/rocketmq/v2/admin.proto",
+ ],
+ &["proto"],
+ )
+ .unwrap();
+}
diff --git a/rust/proto/apache/rocketmq/v2/admin.proto b/rust/proto/apache/rocketmq/v2/admin.proto
new file mode 100644
index 0000000..7dbb702
--- /dev/null
+++ b/rust/proto/apache/rocketmq/v2/admin.proto
@@ -0,0 +1,43 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto3";
+
+package apache.rocketmq.v2;
+
+option cc_enable_arenas = true;
+option csharp_namespace = "Apache.Rocketmq.V2";
+option java_multiple_files = true;
+option java_package = "apache.rocketmq.v2";
+option java_generate_equals_and_hash = true;
+option java_string_check_utf8 = true;
+option java_outer_classname = "MQAdmin";
+
+message ChangeLogLevelRequest {
+ enum Level {
+ TRACE = 0;
+ DEBUG = 1;
+ INFO = 2;
+ WARN = 3;
+ ERROR = 4;
+ }
+ Level level = 1;
+}
+
+message ChangeLogLevelResponse { string remark = 1; }
+
+service Admin {
+ rpc ChangeLogLevel(ChangeLogLevelRequest) returns (ChangeLogLevelResponse) {}
+}
\ No newline at end of file
diff --git a/rust/proto/apache/rocketmq/v2/definition.proto b/rust/proto/apache/rocketmq/v2/definition.proto
new file mode 100644
index 0000000..86bb3dd
--- /dev/null
+++ b/rust/proto/apache/rocketmq/v2/definition.proto
@@ -0,0 +1,450 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto3";
+
+import "google/protobuf/timestamp.proto";
+import "google/protobuf/duration.proto";
+
+package apache.rocketmq.v2;
+
+option csharp_namespace = "Apache.Rocketmq.V2";
+option java_multiple_files = true;
+option java_package = "apache.rocketmq.v2";
+option java_generate_equals_and_hash = true;
+option java_string_check_utf8 = true;
+option java_outer_classname = "MQDomain";
+
+enum TransactionResolution {
+ TRANSACTION_RESOLUTION_UNSPECIFIED = 0;
+ COMMIT = 1;
+ ROLLBACK = 2;
+}
+
+enum TransactionSource {
+ SOURCE_UNSPECIFIED = 0;
+ SOURCE_CLIENT = 1;
+ SOURCE_SERVER_CHECK = 2;
+}
+
+enum Permission {
+ PERMISSION_UNSPECIFIED = 0;
+ NONE = 1;
+ READ = 2;
+ WRITE = 3;
+ READ_WRITE = 4;
+}
+
+enum FilterType {
+ FILTER_TYPE_UNSPECIFIED = 0;
+ TAG = 1;
+ SQL = 2;
+}
+
+message FilterExpression {
+ FilterType type = 1;
+ string expression = 2;
+}
+
+message RetryPolicy {
+ int32 max_attempts = 1;
+ oneof strategy {
+ ExponentialBackoff exponential_backoff = 2;
+ CustomizedBackoff customized_backoff = 3;
+ }
+}
+
+// https://en.wikipedia.org/wiki/Exponential_backoff
+message ExponentialBackoff {
+ google.protobuf.Duration initial = 1;
+ google.protobuf.Duration max = 2;
+ float multiplier = 3;
+}
+
+message CustomizedBackoff {
+ // To support classic backoff strategy which is arbitary defined by end users.
+ // Typical values are: `1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h`
+ repeated google.protobuf.Duration next = 1;
+}
+
+message Resource {
+ string resource_namespace = 1;
+
+ // Resource name identifier, which remains unique within the abstract resource
+ // namespace.
+ string name = 2;
+}
+
+message SubscriptionEntry {
+ Resource topic = 1;
+ FilterExpression expression = 2;
+}
+
+enum AddressScheme {
+ ADDRESS_SCHEME_UNSPECIFIED = 0;
+ IPv4 = 1;
+ IPv6 = 2;
+ DOMAIN_NAME = 3;
+}
+
+message Address {
+ string host = 1;
+ int32 port = 2;
+}
+
+message Endpoints {
+ AddressScheme scheme = 1;
+ repeated Address addresses = 2;
+}
+
+message Broker {
+ // Name of the broker
+ string name = 1;
+
+ // Broker index. Canonically, index = 0 implies that the broker is playing
+ // leader role while brokers with index > 0 play follower role.
+ int32 id = 2;
+
+ // Address of the broker, complying with the following scheme
+ // 1. dns:[//authority/]host[:port]
+ // 2. ipv4:address[:port][,address[:port],...] – IPv4 addresses
+ // 3. ipv6:address[:port][,address[:port],...] – IPv6 addresses
+ Endpoints endpoints = 3;
+}
+
+message MessageQueue {
+ Resource topic = 1;
+ int32 id = 2;
+ Permission permission = 3;
+ Broker broker = 4;
+ repeated MessageType accept_message_types = 5;
+}
+
+enum MessageType {
+ MESSAGE_TYPE_UNSPECIFIED = 0;
+
+ NORMAL = 1;
+
+ // Sequenced message
+ FIFO = 2;
+
+ // Messages that are delivered after the specified duration.
+ DELAY = 3;
+
+ // Messages that are transactional. Only committed messages are delivered to
+ // subscribers.
+ TRANSACTION = 4;
+}
+
+enum DigestType {
+ DIGEST_TYPE_UNSPECIFIED = 0;
+
+ // CRC algorithm achieves goal of detecting random data error with lowest
+ // computation overhead.
+ CRC32 = 1;
+
+ // MD5 algorithm achieves good balance between collision rate and computation
+ // overhead.
+ MD5 = 2;
+
+ // SHA-family has substantially fewer collision with fair amount of
+ // computation.
+ SHA1 = 3;
+}
+
+// When publishing messages to or subscribing messages from brokers, clients
+// shall include or validate digests of message body to ensure data integrity.
+//
+// For message publishing, when an invalid digest were detected, brokers need
+// respond client with BAD_REQUEST.
+//
+// For messages subscription, when an invalid digest were detected, consumers
+// need to handle this case according to message type:
+// 1) Standard messages should be negatively acknowledged instantly, causing
+// immediate re-delivery; 2) FIFO messages require special RPC, to re-fetch
+// previously acquired messages batch;
+//
+// Message consumption model also affects how invalid digest are handled. When
+// messages are consumed in broadcasting way,
+// TODO: define semantics of invalid-digest-when-broadcasting.
+message Digest {
+ DigestType type = 1;
+ string checksum = 2;
+}
+
+enum ClientType {
+ CLIENT_TYPE_UNSPECIFIED = 0;
+ PRODUCER = 1;
+ PUSH_CONSUMER = 2;
+ SIMPLE_CONSUMER = 3;
+}
+
+enum Encoding {
+ ENCODING_UNSPECIFIED = 0;
+
+ IDENTITY = 1;
+
+ GZIP = 2;
+}
+
+message SystemProperties {
+ // Tag, which is optional.
+ optional string tag = 1;
+
+ // Message keys
+ repeated string keys = 2;
+
+ // Message identifier, client-side generated, remains unique.
+ // if message_id is empty, the send message request will be aborted with
+ // status `INVALID_ARGUMENT`
+ string message_id = 3;
+
+ // Message body digest
+ Digest body_digest = 4;
+
+ // Message body encoding. Candidate options are identity, gzip, snappy etc.
+ Encoding body_encoding = 5;
+
+ // Message type, normal, FIFO or transactional.
+ MessageType message_type = 6;
+
+ // Message born time-point.
+ google.protobuf.Timestamp born_timestamp = 7;
+
+ // Message born host. Valid options are IPv4, IPv6 or client host domain name.
+ string born_host = 8;
+
+ // Time-point at which the message is stored in the broker, which is absent
+ // for message publishing.
+ optional google.protobuf.Timestamp store_timestamp = 9;
+
+ // The broker that stores this message. It may be broker name, IP or arbitrary
+ // identifier that uniquely identify the server.
+ string store_host = 10;
+
+ // Time-point at which broker delivers to clients, which is optional.
+ optional google.protobuf.Timestamp delivery_timestamp = 11;
+
+ // If a message is acquired by way of POP, this field holds the receipt,
+ // which is absent for message publishing.
+ // Clients use the receipt to acknowledge or negatively acknowledge the
+ // message.
+ optional string receipt_handle = 12;
+
+ // Message queue identifier in which a message is physically stored.
+ int32 queue_id = 13;
+
+ // Message-queue offset at which a message is stored, which is absent for
+ // message publishing.
+ optional int64 queue_offset = 14;
+
+ // Period of time servers would remain invisible once a message is acquired.
+ optional google.protobuf.Duration invisible_duration = 15;
+
+ // Business code may failed to process messages for the moment. Hence, clients
+ // may request servers to deliver them again using certain back-off strategy,
+ // the attempt is 1 not 0 if message is delivered first time, and it is absent
+ // for message publishing.
+ optional int32 delivery_attempt = 16;
+
+ // Define the group name of message in the same topic, which is optional.
+ optional string message_group = 17;
+
+ // Trace context for each message, which is optional.
+ optional string trace_context = 18;
+
+ // If a transactional message stay unresolved for more than
+ // `transaction_orphan_threshold`, it would be regarded as an
+ // orphan. Servers that manages orphan messages would pick up
+ // a capable publisher to resolve
+ optional google.protobuf.Duration orphaned_transaction_recovery_duration = 19;
+}
+
+message Message {
+ Resource topic = 1;
+
+ // User defined key-value pairs.
+ // If user_properties contain the reserved keys by RocketMQ,
+ // the send message request will be aborted with status `INVALID_ARGUMENT`.
+ // See below links for the reserved keys
+ // https://github.com/apache/rocketmq/blob/master/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java#L58
+ map<string, string> user_properties = 2;
+
+ SystemProperties system_properties = 3;
+
+ bytes body = 4;
+}
+
+message Assignment {
+ MessageQueue message_queue = 1;
+}
+
+enum Code {
+ CODE_UNSPECIFIED = 0;
+
+ // Generic code for success.
+ OK = 20000;
+
+ // Generic code for multiple return results.
+ MULTIPLE_RESULTS = 30000;
+
+ // Generic code for bad request, indicating that required fields or headers are missing.
+ BAD_REQUEST = 40000;
+ // Format of access point is illegal.
+ ILLEGAL_ACCESS_POINT = 40001;
+ // Format of topic is illegal.
+ ILLEGAL_TOPIC = 40002;
+ // Format of consumer group is illegal.
+ ILLEGAL_CONSUMER_GROUP = 40003;
+ // Format of message tag is illegal.
+ ILLEGAL_MESSAGE_TAG = 40004;
+ // Format of message key is illegal.
+ ILLEGAL_MESSAGE_KEY = 40005;
+ // Format of message group is illegal.
+ ILLEGAL_MESSAGE_GROUP = 40006;
+ // Format of message property key is illegal.
+ ILLEGAL_MESSAGE_PROPERTY_KEY = 40007;
+ // Transaction id is invalid.
+ INVALID_TRANSACTION_ID = 40008;
+ // Format of message id is illegal.
+ ILLEGAL_MESSAGE_ID = 40009;
+ // Format of filter expression is illegal.
+ ILLEGAL_FILTER_EXPRESSION = 40010;
+ // The invisible time of request is invalid.
+ ILLEGAL_INVISIBLE_TIME = 40011;
+ // The delivery timestamp of message is invalid.
+ ILLEGAL_DELIVERY_TIME = 40012;
+ // Receipt handle of message is invalid.
+ INVALID_RECEIPT_HANDLE = 40013;
+ // Message property conflicts with its type.
+ MESSAGE_PROPERTY_CONFLICT_WITH_TYPE = 40014;
+ // Client type could not be recognized.
+ UNRECOGNIZED_CLIENT_TYPE = 40015;
+ // Message is corrupted.
+ MESSAGE_CORRUPTED = 40016;
+ // Request is rejected due to missing of x-mq-client-id header.
+ CLIENT_ID_REQUIRED = 40017;
+
+ // Generic code indicates that the client request lacks valid authentication
+ // credentials for the requested resource.
+ UNAUTHORIZED = 40100;
+
+ // Generic code indicates that the account is suspended due to overdue of payment.
+ PAYMENT_REQUIRED = 40200;
+
+ // Generic code for the case that user does not have the permission to operate.
+ FORBIDDEN = 40300;
+
+ // Generic code for resource not found.
+ NOT_FOUND = 40400;
+ // Message not found from server.
+ MESSAGE_NOT_FOUND = 40401;
+ // Topic resource does not exist.
+ TOPIC_NOT_FOUND = 40402;
+ // Consumer group resource does not exist.
+ CONSUMER_GROUP_NOT_FOUND = 40403;
+
+ // Generic code representing client side timeout when connecting to, reading data from, or write data to server.
+ REQUEST_TIMEOUT = 40800;
+
+ // Generic code represents that the request entity is larger than limits defined by server.
+ PAYLOAD_TOO_LARGE = 41300;
+ // Message body size exceeds the threshold.
+ MESSAGE_BODY_TOO_LARGE = 41301;
+
+ // Generic code for use cases where pre-conditions are not met.
+ // For example, if a producer instance is used to publish messages without prior start() invocation,
+ // this error code will be raised.
+ PRECONDITION_FAILED = 42800;
+
+ // Generic code indicates that too many requests are made in short period of duration.
+ // Requests are throttled.
+ TOO_MANY_REQUESTS = 42900;
+
+ // Generic code for the case that the server is unwilling to process the request because its header fields are too large.
+ // The request may be resubmitted after reducing the size of the request header fields.
+ REQUEST_HEADER_FIELDS_TOO_LARGE = 43100;
+ // Message properties total size exceeds the threshold.
+ MESSAGE_PROPERTIES_TOO_LARGE = 43101;
+
+ // Generic code indicates that server/client encountered an unexpected
+ // condition that prevented it from fulfilling the request.
+ INTERNAL_ERROR = 50000;
+ // Code indicates that the server encountered an unexpected condition
+ // that prevented it from fulfilling the request.
+ // This error response is a generic "catch-all" response.
+ // Usually, this indicates the server cannot find a better alternative
+ // error code to response. Sometimes, server administrators log error
+ // responses like the 500 status code with more details about the request
+ // to prevent the error from happening again in the future.
+ //
+ // See https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/500
+ INTERNAL_SERVER_ERROR = 50001;
+ // The HA-mechanism is not working now.
+ HA_NOT_AVAILABLE = 50002;
+
+ // Generic code means that the server or client does not support the
+ // functionality required to fulfill the request.
+ NOT_IMPLEMENTED = 50100;
+
+ // Generic code represents that the server, which acts as a gateway or proxy,
+ // does not get an satisfied response in time from its upstream servers.
+ // See https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/504
+ PROXY_TIMEOUT = 50400;
+ // Message persistence timeout.
+ MASTER_PERSISTENCE_TIMEOUT = 50401;
+ // Slave persistence timeout.
+ SLAVE_PERSISTENCE_TIMEOUT = 50402;
+
+ // Generic code for unsupported operation.
+ UNSUPPORTED = 50500;
+ // Operation is not allowed in current version.
+ VERSION_UNSUPPORTED = 50501;
+ // Not allowed to verify message. Chances are that you are verifying
+ // a FIFO message, as is violating FIFO semantics.
+ VERIFY_FIFO_MESSAGE_UNSUPPORTED = 50502;
+
+ // Generic code for failed message consumption.
+ FAILED_TO_CONSUME_MESSAGE = 60000;
+}
+
+message Status {
+ Code code = 1;
+ string message = 2;
+}
+
+enum Language {
+ LANGUAGE_UNSPECIFIED = 0;
+ JAVA = 1;
+ CPP = 2;
+ DOT_NET = 3;
+ GOLANG = 4;
+ RUST = 5;
+}
+
+// User Agent
+message UA {
+ // SDK language
+ Language language = 1;
+
+ // SDK version
+ string version = 2;
+
+ // Platform details, including OS name, version, arch etc.
+ string platform = 3;
+
+ // Hostname of the node
+ string hostname = 4;
+}
\ No newline at end of file
diff --git a/rust/proto/apache/rocketmq/v2/service.proto b/rust/proto/apache/rocketmq/v2/service.proto
new file mode 100644
index 0000000..42bf332
--- /dev/null
+++ b/rust/proto/apache/rocketmq/v2/service.proto
@@ -0,0 +1,455 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto3";
+
+import "google/protobuf/duration.proto";
+import "google/protobuf/timestamp.proto";
+
+import "apache/rocketmq/v2/definition.proto";
+
+package apache.rocketmq.v2;
+
+option csharp_namespace = "Apache.Rocketmq.V2";
+option java_multiple_files = true;
+option java_package = "apache.rocketmq.v2";
+option java_generate_equals_and_hash = true;
+option java_string_check_utf8 = true;
+option java_outer_classname = "MQService";
+
+// Topics are destination of messages to publish to or subscribe from. Similar
+// to domain names, they will be addressable after resolution through the
+// provided access point.
+//
+// Access points are usually the addresses of name servers, which fulfill
+// service discovery, load-balancing and other auxiliary services. Name servers
+// receive periodic heartbeats from affiliate brokers and erase those which
+// failed to maintain alive status.
+//
+// Name servers answer queries of QueryRouteRequest, responding clients with
+// addressable message-queues, which they may directly publish messages to or
+// subscribe messages from.
+//
+// QueryRouteRequest shall include source endpoints, aka, configured
+// access-point, which annotates tenant-id, instance-id or other
+// vendor-specific settings. Purpose-built name servers may respond customized
+// results based on these particular requirements.
+message QueryRouteRequest {
+ Resource topic = 1;
+ Endpoints endpoints = 2;
+}
+
+message QueryRouteResponse {
+ Status status = 1;
+
+ repeated MessageQueue message_queues = 2;
+}
+
+message SendMessageRequest {
+ repeated Message messages = 1;
+}
+
+message SendResultEntry {
+ Status status = 1;
+ string message_id = 2;
+ string transaction_id = 3;
+ int64 offset = 4;
+}
+
+message SendMessageResponse {
+ Status status = 1;
+
+ // Some implementation may have partial failure issues. Client SDK developers are expected to inspect
+ // each entry for best certainty.
+ repeated SendResultEntry entries = 2;
+}
+
+message QueryAssignmentRequest {
+ Resource topic = 1;
+ Resource group = 2;
+ Endpoints endpoints = 3;
+}
+
+message QueryAssignmentResponse {
+ Status status = 1;
+ repeated Assignment assignments = 2;
+}
+
+message ReceiveMessageRequest {
+ Resource group = 1;
+ MessageQueue message_queue = 2;
+ FilterExpression filter_expression = 3;
+ int32 batch_size = 4;
+ // Required if client type is simple consumer.
+ optional google.protobuf.Duration invisible_duration = 5;
+ // For message auto renew and clean
+ bool auto_renew = 6;
+}
+
+message ReceiveMessageResponse {
+ oneof content {
+ Status status = 1;
+ Message message = 2;
+ // The timestamp that brokers start to deliver status line or message.
+ google.protobuf.Timestamp delivery_timestamp = 3;
+ }
+}
+
+message AckMessageEntry {
+ string message_id = 1;
+ string receipt_handle = 2;
+}
+
+message AckMessageRequest {
+ Resource group = 1;
+ Resource topic = 2;
+ repeated AckMessageEntry entries = 3;
+}
+
+message AckMessageResultEntry {
+ string message_id = 1;
+ string receipt_handle = 2;
+
+ // Acknowledge result may be acquired through inspecting
+ // `status.code`; In case acknowledgement failed, `status.message`
+ // is the explanation of the failure.
+ Status status = 3;
+}
+
+message AckMessageResponse {
+ // RPC tier status, which is used to represent RPC-level errors including
+ // authentication, authorization, throttling and other general failures.
+ Status status = 1;
+
+ repeated AckMessageResultEntry entries = 2;
+}
+
+message ForwardMessageToDeadLetterQueueRequest {
+ Resource group = 1;
+ Resource topic = 2;
+ string receipt_handle = 3;
+ string message_id = 4;
+ int32 delivery_attempt = 5;
+ int32 max_delivery_attempts = 6;
+}
+
+message ForwardMessageToDeadLetterQueueResponse {
+ Status status = 1;
+}
+
+message HeartbeatRequest {
+ optional Resource group = 1;
+ ClientType client_type = 2;
+}
+
+message HeartbeatResponse {
+ Status status = 1;
+}
+
+message EndTransactionRequest {
+ Resource topic = 1;
+ string message_id = 2;
+ string transaction_id = 3;
+ TransactionResolution resolution = 4;
+ TransactionSource source = 5;
+ string trace_context = 6;
+}
+
+message EndTransactionResponse {
+ Status status = 1;
+}
+
+message PrintThreadStackTraceCommand {
+ string nonce = 1;
+}
+
+message ThreadStackTrace {
+ string nonce = 1;
+ optional string thread_stack_trace = 2;
+}
+
+message VerifyMessageCommand {
+ string nonce = 1;
+ Message message = 2;
+}
+
+message VerifyMessageResult {
+ string nonce = 1;
+}
+
+message RecoverOrphanedTransactionCommand {
+ Message message = 1;
+ string transaction_id = 2;
+}
+
+message Publishing {
+ // Publishing settings below here is appointed by client, thus it is
+ // unnecessary for server to push at present.
+ //
+ // List of topics to which messages will publish to.
+ repeated Resource topics = 1;
+
+ // If the message body size exceeds `max_body_size`, broker servers would
+ // reject the request. As a result, it is advisable that Producer performs
+ // client-side check validation.
+ int32 max_body_size = 2;
+
+ // When `validate_message_type` flag set `false`, no need to validate message's type
+ // with messageQueue's `accept_message_types` before publising.
+ bool validate_message_type = 3;
+}
+
+message Subscription {
+ // Subscription settings below here is appointed by client, thus it is
+ // unnecessary for server to push at present.
+ //
+ // Consumer group.
+ optional Resource group = 1;
+
+ // Subscription for consumer.
+ repeated SubscriptionEntry subscriptions = 2;
+
+ // Subscription settings below here are from server, it is essential for
+ // server to push.
+ //
+ // When FIFO flag is `true`, messages of the same message group are processed
+ // in first-in-first-out manner.
+ //
+ // Brokers will not deliver further messages of the same group utill prior
+ // ones are completely acknowledged.
+ optional bool fifo = 3;
+
+ // Message receive batch size here is essential for push consumer.
+ optional int32 receive_batch_size = 4;
+
+ // Long-polling timeout for `ReceiveMessageRequest`, which is essential for
+ // push consumer.
+ optional google.protobuf.Duration long_polling_timeout = 5;
+}
+
+message Metric {
+ // Indicates that if client should export local metrics to server.
+ bool on = 1;
+
+ // The endpoint that client metrics should be exported to, which is required if the switch is on.
+ optional Endpoints endpoints = 2;
+}
+
+message Settings {
+ // Configurations for all clients.
+ optional ClientType client_type = 1;
+
+ optional Endpoints access_point = 2;
+
+ // If publishing of messages encounters throttling or server internal errors,
+ // publishers should implement automatic retries after progressive longer
+ // back-offs for consecutive errors.
+ //
+ // When processing message fails, `backoff_policy` describes an interval
+ // after which the message should be available to consume again.
+ //
+ // For FIFO messages, the interval should be relatively small because
+ // messages of the same message group would not be readily available utill
+ // the prior one depletes its lifecycle.
+ optional RetryPolicy backoff_policy = 3;
+
+ // Request timeout for RPCs excluding long-polling.
+ optional google.protobuf.Duration request_timeout = 4;
+
+ oneof pub_sub {
+ Publishing publishing = 5;
+
+ Subscription subscription = 6;
+ }
+
+ // User agent details
+ UA user_agent = 7;
+
+ Metric metric = 8;
+}
+
+message TelemetryCommand {
+ optional Status status = 1;
+
+ oneof command {
+ // Client settings
+ Settings settings = 2;
+
+ // These messages are from client.
+ //
+ // Report thread stack trace to server.
+ ThreadStackTrace thread_stack_trace = 3;
+
+ // Report message verify result to server.
+ VerifyMessageResult verify_message_result = 4;
+
+ // There messages are from server.
+ //
+ // Request client to recover the orphaned transaction message.
+ RecoverOrphanedTransactionCommand recover_orphaned_transaction_command = 5;
+
+ // Request client to print thread stack trace.
+ PrintThreadStackTraceCommand print_thread_stack_trace_command = 6;
+
+ // Request client to verify the consumption of the appointed message.
+ VerifyMessageCommand verify_message_command = 7;
+ }
+}
+
+message NotifyClientTerminationRequest {
+ // Consumer group, which is absent for producer.
+ optional Resource group = 1;
+}
+
+message NotifyClientTerminationResponse {
+ Status status = 1;
+}
+
+message ChangeInvisibleDurationRequest {
+ Resource group = 1;
+ Resource topic = 2;
+
+ // Unique receipt handle to identify message to change
+ string receipt_handle = 3;
+
+ // New invisible duration
+ google.protobuf.Duration invisible_duration = 4;
+
+ // For message tracing
+ string message_id = 5;
+}
+
+message ChangeInvisibleDurationResponse {
+ Status status = 1;
+
+ // Server may generate a new receipt handle for the message.
+ string receipt_handle = 2;
+}
+
+// For all the RPCs in MessagingService, the following error handling policies
+// apply:
+//
+// If the request doesn't bear a valid authentication credential, return a
+// response with common.status.code == `UNAUTHENTICATED`. If the authenticated
+// user is not granted with sufficient permission to execute the requested
+// operation, return a response with common.status.code == `PERMISSION_DENIED`.
+// If the per-user-resource-based quota is exhausted, return a response with
+// common.status.code == `RESOURCE_EXHAUSTED`. If any unexpected server-side
+// errors raise, return a response with common.status.code == `INTERNAL`.
+service MessagingService {
+ // Queries the route entries of the requested topic in the perspective of the
+ // given endpoints. On success, servers should return a collection of
+ // addressable message-queues. Note servers may return customized route
+ // entries based on endpoints provided.
+ //
+ // If the requested topic doesn't exist, returns `NOT_FOUND`.
+ // If the specific endpoints is empty, returns `INVALID_ARGUMENT`.
+ rpc QueryRoute(QueryRouteRequest) returns (QueryRouteResponse) {
+ }
+
+ // Producer or consumer sends HeartbeatRequest to servers periodically to
+ // keep-alive. Additionally, it also reports client-side configuration,
+ // including topic subscription, load-balancing group name, etc.
+ //
+ // Returns `OK` if success.
+ //
+ // If a client specifies a language that is not yet supported by servers,
+ // returns `INVALID_ARGUMENT`
+ rpc Heartbeat(HeartbeatRequest) returns (HeartbeatResponse) {
+ }
+
+ // Delivers messages to brokers.
+ // Clients may further:
+ // 1. Refine a message destination to message-queues which fulfills parts of
+ // FIFO semantic;
+ // 2. Flag a message as transactional, which keeps it invisible to consumers
+ // until it commits;
+ // 3. Time a message, making it invisible to consumers till specified
+ // time-point;
+ // 4. And more...
+ //
+ // Returns message-id or transaction-id with status `OK` on success.
+ //
+ // If the destination topic doesn't exist, returns `NOT_FOUND`.
+ rpc SendMessage(SendMessageRequest) returns (SendMessageResponse) {
+ }
+
+ // Queries the assigned route info of a topic for current consumer,
+ // the returned assignment result is decided by server-side load balancer.
+ //
+ // If the corresponding topic doesn't exist, returns `NOT_FOUND`.
+ // If the specific endpoints is empty, returns `INVALID_ARGUMENT`.
+ rpc QueryAssignment(QueryAssignmentRequest) returns (QueryAssignmentResponse) {
+ }
+
+ // Receives messages from the server in batch manner, returns a set of
+ // messages if success. The received messages should be acked or redelivered
+ // after processed.
+ //
+ // If the pending concurrent receive requests exceed the quota of the given
+ // consumer group, returns `UNAVAILABLE`. If the upstream store server hangs,
+ // return `DEADLINE_EXCEEDED` in a timely manner. If the corresponding topic
+ // or consumer group doesn't exist, returns `NOT_FOUND`. If there is no new
+ // message in the specific topic, returns `OK` with an empty message set.
+ // Please note that client may suffer from false empty responses.
+ //
+ // If failed to receive message from remote, server must return only one
+ // `ReceiveMessageResponse` as the reply to the request, whose `Status` indicates
+ // the specific reason of failure, otherwise, the reply is considered successful.
+ rpc ReceiveMessage(ReceiveMessageRequest) returns (stream ReceiveMessageResponse) {
+ }
+
+ // Acknowledges the message associated with the `receipt_handle` or `offset`
+ // in the `AckMessageRequest`, it means the message has been successfully
+ // processed. Returns `OK` if the message server remove the relevant message
+ // successfully.
+ //
+ // If the given receipt_handle is illegal or out of date, returns
+ // `INVALID_ARGUMENT`.
+ rpc AckMessage(AckMessageRequest) returns (AckMessageResponse) {
+ }
+
+ // Forwards one message to dead letter queue if the max delivery attempts is
+ // exceeded by this message at client-side, return `OK` if success.
+ rpc ForwardMessageToDeadLetterQueue(ForwardMessageToDeadLetterQueueRequest)
+ returns (ForwardMessageToDeadLetterQueueResponse) {
+ }
+
+ // Commits or rollback one transactional message.
+ rpc EndTransaction(EndTransactionRequest) returns (EndTransactionResponse) {
+ }
+
+ // Once a client starts, it would immediately establishes bi-lateral stream
+ // RPCs with brokers, reporting its settings as the initiative command.
+ //
+ // When servers have need of inspecting client status, they would issue
+ // telemetry commands to clients. After executing received instructions,
+ // clients shall report command execution results through client-side streams.
+ rpc Telemetry(stream TelemetryCommand) returns (stream TelemetryCommand) {
+ }
+
+ // Notify the server that the client is terminated.
+ rpc NotifyClientTermination(NotifyClientTerminationRequest) returns (NotifyClientTerminationResponse) {
+ }
+
+ // Once a message is retrieved from consume queue on behalf of the group, it
+ // will be kept invisible to other clients of the same group for a period of
+ // time. The message is supposed to be processed within the invisible
+ // duration. If the client, which is in charge of the invisible message, is
+ // not capable of processing the message timely, it may use
+ // ChangeInvisibleDuration to lengthen invisible duration.
+ rpc ChangeInvisibleDuration(ChangeInvisibleDurationRequest) returns (ChangeInvisibleDurationResponse) {
+ }
+}
\ No newline at end of file
diff --git a/rust/src/client.rs b/rust/src/client.rs
new file mode 100644
index 0000000..1a139f6
--- /dev/null
+++ b/rust/src/client.rs
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+use crate::command;
+use crate::error;
+use crate::pb::{self, QueryRouteRequest, Resource};
+use crate::{error::ClientError, pb::messaging_service_client::MessagingServiceClient};
+use parking_lot::Mutex;
+use slog::info;
+use slog::{debug, error, o, warn, Logger};
+use std::{
+ collections::HashMap,
+ sync::Arc,
+ sync::{atomic::AtomicUsize, Weak},
+};
+use tokio::sync::oneshot;
+use tonic::transport::{Certificate, Channel, ClientTlsConfig};
+
+#[derive(Debug, Clone)]
+struct Session {
+ stub: MessagingServiceClient<Channel>,
+ logger: Logger,
+}
+
+impl Session {
+ async fn new(endpoint: String, logger: &Logger) -> Result<Self, error::ClientError> {
+ debug!(logger, "Creating session to {}", endpoint);
+ let peer_addr = endpoint.clone();
+
+ let tls = ClientTlsConfig::default();
+
+ let channel = Channel::from_shared(endpoint)
+ .map_err(|e| {
+ error!(logger, "Failed to create channel. Cause: {:?}", e);
+ error::ClientError::Connect
+ })?
+ .tls_config(tls)
+ .map_err(|e| {
+ error!(logger, "Failed to configure TLS. Cause: {:?}", e);
+ error::ClientError::Connect
+ })?
+ .connect_timeout(std::time::Duration::from_secs(3))
+ .tcp_nodelay(true)
+ .connect()
+ .await
+ .map_err(|e| {
+ error!(logger, "Failed to connect to {}. Cause: {:?}", peer_addr, e);
+ error::ClientError::Connect
+ })?;
+
+ let stub = MessagingServiceClient::new(channel);
+
+ Ok(Session {
+ stub,
+ logger: logger.new(o!("peer" => peer_addr)),
+ })
+ }
+
+ async fn query_route(
+ &mut self,
+ request: tonic::Request<pb::QueryRouteRequest>,
+ ) -> Result<tonic::Response<pb::QueryRouteResponse>, error::ClientError> {
+ match self.stub.query_route(request).await {
+ Ok(response) => {
+ return Ok(response);
+ }
+ Err(e) => {
+ error!(self.logger, "QueryRoute failed. Cause: {:?}", e);
+ return Err(error::ClientError::ClientInternal);
+ }
+ }
+ }
+}
+
+#[derive(Debug)]
+struct SessionManager {
+ logger: Logger,
+ tx: tokio::sync::mpsc::Sender<command::Command>,
+}
+
+impl SessionManager {
+ fn new(logger: Logger) -> Self {
+ let (tx, mut rx) = tokio::sync::mpsc::channel(256);
+
+ let submitter_logger = logger.new(o!("component" => "submitter"));
+ tokio::spawn(async move {
+ let mut session_map: HashMap<String, Session> = HashMap::new();
+ loop {
+ match rx.recv().await {
+ Some(command) => match command {
+ command::Command::QueryRoute { peer, request, tx } => {
+ if !session_map.contains_key(&peer) {
+ match Session::new(peer.clone(), &submitter_logger).await {
+ Ok(session) => {
+ session_map.insert(peer.clone(), session);
+ }
+ Err(e) => {
+ error!(
+ submitter_logger,
+ "Failed to create session to {}. Cause: {:?}", peer, e
+ );
+ let _ = tx.send(Err(ClientError::Connect));
+ continue;
+ }
+ }
+ }
+
+ match session_map.get(&peer) {
+ Some(session) => {
+ // Cloning Channel is cheap and encouraged
+ // https://docs.rs/tonic/0.7.2/tonic/transport/struct.Channel.html#multiplexing-requests
+ let mut session = session.clone();
+ tokio::spawn(async move {
+ let result = session.query_route(request).await;
+ let _ = tx.send(result);
+ });
+ }
+ None => {}
+ }
+ }
+ },
+ None => {
+ info!(submitter_logger, "Submit loop exit");
+ break;
+ }
+ }
+ }
+ });
+
+ SessionManager { logger, tx }
+ }
+
+ async fn route(
+ &self,
+ endpoint: &str,
+ topic: &str,
+ client: Weak<&Client>,
+ ) -> Result<Route, error::ClientError> {
+ let client = match client.upgrade() {
+ Some(client) => client,
+ None => {
+ return Err(error::ClientError::ClientInternal);
+ }
+ };
+
+ let request = QueryRouteRequest {
+ topic: Some(Resource {
+ name: topic.to_owned(),
+ resource_namespace: client.arn.clone(),
+ }),
+ endpoints: Some(client.access_point.clone()),
+ };
+
+ let mut request = tonic::Request::new(request);
+ client.sign(request.metadata_mut());
+
+ let (tx1, rx1) = oneshot::channel();
+ let command = command::Command::QueryRoute {
+ peer: endpoint.to_owned(),
+ request,
+ tx: tx1,
+ };
+
+ match self.tx.send(command).await {
+ Ok(_) => {}
+ Err(e) => {
+ error!(self.logger, "Failed to submit request");
+ }
+ }
+
+ match rx1.await {
+ Ok(result) => result.map(|_response| Route {}),
+ Err(e) => {
+ error!(self.logger, "oneshot channel error. Cause: {:?}", e);
+ Err(ClientError::ClientInternal)
+ }
+ }
+ }
+}
+
+#[derive(Debug)]
+struct Route {}
+
+#[derive(Debug)]
+enum RouteStatus {
+ Querying(Vec<oneshot::Sender<Result<Arc<Route>, error::ClientError>>>),
+ Found(Arc<Route>),
+}
+
+#[derive(Debug)]
+pub(crate) struct Client {
+ session_manager: SessionManager,
+ logger: Logger,
+ route_table: Mutex<HashMap<String /* topic */, RouteStatus>>,
+ arn: String,
+ id: String,
+ access_point: pb::Endpoints,
+}
+
+static CLIENT_ID_SEQUENCE: AtomicUsize = AtomicUsize::new(0);
+
+impl Client {
+ fn client_id() -> String {
+ let host = match hostname::get() {
+ Ok(name) => name,
+ Err(_) => "localhost".into(),
+ };
+
+ let host = match host.into_string() {
+ Ok(host) => host,
+ Err(_) => String::from("localhost"),
+ };
+
+ format!(
+ "{}@{}#{}",
+ host,
+ std::process::id(),
+ CLIENT_ID_SEQUENCE.fetch_add(1, std::sync::atomic::Ordering::Relaxed),
+ )
+ }
+
+ pub(crate) fn new(
+ logger: Logger,
+ access_url: impl std::net::ToSocketAddrs,
+ ) -> Result<Self, error::ClientError> {
+ let id = Self::client_id();
+ let mut access_point = pb::Endpoints {
+ scheme: pb::AddressScheme::IPv4 as i32,
+ addresses: vec![],
+ };
+
+ for socket_addr in access_url
+ .to_socket_addrs()
+ .map_err(|e| error::ClientError::ClientInternal)?
+ {
+ if socket_addr.is_ipv4() {
+ access_point.scheme = pb::AddressScheme::IPv4 as i32;
+ } else {
+ access_point.scheme = pb::AddressScheme::IPv6 as i32;
+ }
+
+ let addr = pb::Address {
+ host: socket_addr.ip().to_string(),
+ port: socket_addr.port() as i32,
+ };
+ access_point.addresses.push(addr);
+ }
+
+ Ok(Client {
+ session_manager: SessionManager::new(logger.new(o!("component" => "session_manager"))),
+ logger,
+ route_table: Mutex::new(HashMap::new()),
+ arn: String::from(""),
+ id,
+ access_point,
+ })
+ }
+
+ async fn query_route(
+ &self,
+ topic: &str,
+ lookup_cache: bool,
+ ) -> Result<Arc<Route>, error::ClientError> {
+ debug!(self.logger, "Query route for topic={}", topic);
+ let rx = match self
+ .route_table
+ .lock()
+ .entry(topic.to_owned())
+ .or_insert_with(|| RouteStatus::Querying(Vec::new()))
+ {
+ RouteStatus::Found(route) => {
+ if lookup_cache {
+ return Ok(Arc::clone(route));
+ }
+ None
+ }
+ RouteStatus::Querying(ref mut v) => {
+ if v.is_empty() {
+ None
+ } else {
+ let (tx, rx) = oneshot::channel();
+ v.push(tx);
+ Some(rx)
+ }
+ }
+ };
+
+ if let Some(rx) = rx {
+ match rx.await {
+ Ok(route) => {
+ return route;
+ }
+ Err(_e) => {
+ return Err(error::ClientError::ClientInternal);
+ }
+ }
+ }
+
+ let client = Arc::new(*&self);
+ let client_weak = Arc::downgrade(&client);
+ let endpoint = "https://127.0.0.1:8081";
+ match self
+ .session_manager
+ .route(endpoint, topic, client_weak)
+ .await
+ {
+ Ok(route) => {
+ let route = Arc::new(route);
+ let prev = self
+ .route_table
+ .lock()
+ .insert(topic.to_owned(), RouteStatus::Found(Arc::clone(&route)));
+
+ match prev {
+ Some(RouteStatus::Found(_)) => {}
+ Some(RouteStatus::Querying(mut v)) => {
+ for item in v.drain(..) {
+ let _ = item.send(Ok(Arc::clone(&route)));
+ }
+ }
+ None => {}
+ };
+ return Ok(route);
+ }
+ Err(_e) => {
+ let prev = self.route_table.lock().remove(topic);
+ match prev {
+ Some(RouteStatus::Found(route)) => {
+ self.route_table
+ .lock()
+ .insert(topic.to_owned(), RouteStatus::Found(Arc::clone(&route)));
+ return Ok(route);
+ }
+ Some(RouteStatus::Querying(mut v)) => {
+ for tx in v.drain(..) {
+ let _ = tx.send(Err(error::ClientError::ClientInternal));
+ }
+ return Err(error::ClientError::ClientInternal);
+ }
+ None => {
+ return Err(error::ClientError::ClientInternal);
+ }
+ };
+ }
+ }
+ }
+
+ fn sign(&self, metadata: &mut tonic::metadata::MetadataMap) {
+ let _ = tonic::metadata::AsciiMetadataValue::try_from(&self.id).and_then(|v| {
+ metadata.insert("x-mq-client-id", v);
+ Ok(())
+ });
+
+ metadata.insert(
+ "x-mq-language",
+ tonic::metadata::AsciiMetadataValue::from_static("RUST"),
+ );
+ metadata.insert(
+ "x-mq-client-version",
+ tonic::metadata::AsciiMetadataValue::from_static("5.0.0"),
+ );
+ metadata.insert(
+ "x-mq-protocol-version",
+ tonic::metadata::AsciiMetadataValue::from_static("2.0.0"),
+ );
+ }
+}
+
+#[cfg(test)]
+mod tests {
+
+ use super::*;
+ use slog::Drain;
+
+ #[test]
+ fn test_client_id() {
+ let mut set = std::collections::HashSet::new();
+ (0..256).for_each(|_| {
+ let id = Client::client_id();
+ assert_eq!(false, set.contains(&id));
+ set.insert(id);
+ });
+ }
+
+ #[tokio::test]
+ async fn test_session_manager_new() {
+ let _session_manager = SessionManager::new(create_logger());
+ drop(_session_manager);
+ }
+
+ fn create_logger() -> Logger {
+ let decorator = slog_term::TermDecorator::new().build();
+ let drain = slog_term::FullFormat::new(decorator).build().fuse();
+ let drain = slog_async::Async::new(drain).build().fuse();
+ slog::Logger::root(drain, o!())
+ }
+}
diff --git a/rust/src/command.rs b/rust/src/command.rs
new file mode 100644
index 0000000..5a3f3fa
--- /dev/null
+++ b/rust/src/command.rs
@@ -0,0 +1,13 @@
+use crate::client::Client;
+use crate::error::ClientError;
+use crate::pb::{QueryRouteRequest, QueryRouteResponse};
+use tokio::sync::oneshot;
+use tonic::{Request, Response};
+
+pub(crate) enum Command {
+ QueryRoute {
+ peer: String,
+ request: Request<QueryRouteRequest>,
+ tx: oneshot::Sender<Result<Response<QueryRouteResponse>, ClientError>>,
+ },
+}
diff --git a/rust/src/error.rs b/rust/src/error.rs
new file mode 100644
index 0000000..f31f5d4
--- /dev/null
+++ b/rust/src/error.rs
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+use thiserror::Error;
+
+#[derive(Error, Debug)]
+pub enum ClientError {
+ #[error("Failed to create session")]
+ Connect,
+
+ #[error("Client internal error")]
+ ClientInternal,
+}
diff --git a/rust/src/lib.rs b/rust/src/lib.rs
new file mode 100644
index 0000000..d974146
--- /dev/null
+++ b/rust/src/lib.rs
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#[path = "pb/apache.rocketmq.v2.rs"]
+mod pb;
+
+mod error;
+
+mod command;
+
+mod client;
+
+mod producer;
diff --git a/rust/src/main.rs b/rust/src/main.rs
new file mode 100644
index 0000000..d69ca78
--- /dev/null
+++ b/rust/src/main.rs
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+fn main() {
+ println!("Hello, world!");
+}
diff --git a/rust/src/pb/README.md b/rust/src/pb/README.md
new file mode 100644
index 0000000..e198bf0
--- /dev/null
+++ b/rust/src/pb/README.md
@@ -0,0 +1,5 @@
+## Note
+--------------------------------------------------------
+Files in this directory are generated tonic_build crate.
+
+**DO NOT EDIT THEM!**
\ No newline at end of file
diff --git a/rust/src/producer.rs b/rust/src/producer.rs
new file mode 100644
index 0000000..685eb82
--- /dev/null
+++ b/rust/src/producer.rs
@@ -0,0 +1,25 @@
+use slog::Logger;
+
+use crate::{client, error};
+
+struct Producer {
+ client: client::Client,
+}
+
+impl Producer {
+ pub async fn new<T>(logger: Logger, topics: T) -> Result<Self, error::ClientError>
+ where
+ T: IntoIterator,
+ T::Item: AsRef<str>,
+ {
+ let access_point = "localhost:8081";
+ let client = client::Client::new(logger, access_point)?;
+ for _topic in topics.into_iter() {
+ // client.subscribe(topic.as_ref()).await;
+ }
+
+ Ok(Producer { client })
+ }
+
+ pub fn start(&mut self) {}
+}