You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/07/29 06:20:11 UTC

[rocketmq-clients] branch master updated: Set up Rust boilerplate code (#92)

This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new 4df9f0e  Set up Rust boilerplate code (#92)
4df9f0e is described below

commit 4df9f0e31fa30a3791b1acd8c02c4e1682b3d068
Author: Zhanhui Li <li...@gmail.com>
AuthorDate: Fri Jul 29 14:20:07 2022 +0800

    Set up Rust boilerplate code (#92)
---
 .gitignore                                     |   7 +
 rust/Cargo.toml                                |  21 ++
 rust/build.rs                                  |  14 +
 rust/proto/apache/rocketmq/v2/admin.proto      |  43 +++
 rust/proto/apache/rocketmq/v2/definition.proto | 450 ++++++++++++++++++++++++
 rust/proto/apache/rocketmq/v2/service.proto    | 455 +++++++++++++++++++++++++
 rust/src/client.rs                             | 410 ++++++++++++++++++++++
 rust/src/command.rs                            |  13 +
 rust/src/error.rs                              |  26 ++
 rust/src/lib.rs                                |  26 ++
 rust/src/main.rs                               |  20 ++
 rust/src/pb/README.md                          |   5 +
 rust/src/producer.rs                           |  25 ++
 13 files changed, 1515 insertions(+)

diff --git a/.gitignore b/.gitignore
index 68a6ad8..1ea2e09 100644
--- a/.gitignore
+++ b/.gitignore
@@ -27,3 +27,10 @@ dependency-reduced-pom.xml
 
 # C# 
 obj/
+# Java
+java/client/src/main/java/org/apache/rocketmq/client/java/example/
+
+# Rust
+rust/target
+rust/Cargo.lock
+rust/src/pb/*.rs
\ No newline at end of file
diff --git a/rust/Cargo.toml b/rust/Cargo.toml
new file mode 100644
index 0000000..b290d64
--- /dev/null
+++ b/rust/Cargo.toml
@@ -0,0 +1,21 @@
+[package]
+name = "rocketmq"
+version = "0.1.0"
+edition = "2021"
+
+[dependencies]
+tokio = { version = "1", features = ["full"] }
+tonic = {version = "0.7", features = ["tls", "default", "channel", "tls-roots"]}
+prost = "0.10"
+prost-types = "0.10"
+thiserror = "1.0"
+slog = {version = "2.7.0", features=["max_level_trace", "release_max_level_info"]}
+slog-term = "2.9.0"
+slog-async = "2.7.0"
+parking_lot = "0.12"
+hmac = "0.12"
+hostname = "0.3.1"
+tokio-rustls = {version = "0.23", features = ["default", "dangerous_configuration"] }
+
+[build-dependencies]
+tonic-build = "0.7"
\ No newline at end of file
diff --git a/rust/build.rs b/rust/build.rs
new file mode 100644
index 0000000..eebab8a
--- /dev/null
+++ b/rust/build.rs
@@ -0,0 +1,14 @@
+fn main() {
+    tonic_build::configure()
+        .build_client(true)
+        .build_server(false)
+        .out_dir("src/pb")
+        .compile(
+            &[
+                "proto/apache/rocketmq/v2/service.proto",
+                "proto/apache/rocketmq/v2/admin.proto",
+            ],
+            &["proto"],
+        )
+        .unwrap();
+}
diff --git a/rust/proto/apache/rocketmq/v2/admin.proto b/rust/proto/apache/rocketmq/v2/admin.proto
new file mode 100644
index 0000000..7dbb702
--- /dev/null
+++ b/rust/proto/apache/rocketmq/v2/admin.proto
@@ -0,0 +1,43 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto3";
+
+package apache.rocketmq.v2;
+
+option cc_enable_arenas = true;
+option csharp_namespace = "Apache.Rocketmq.V2";
+option java_multiple_files = true;
+option java_package = "apache.rocketmq.v2";
+option java_generate_equals_and_hash = true;
+option java_string_check_utf8 = true;
+option java_outer_classname = "MQAdmin";
+
+message ChangeLogLevelRequest {
+  enum Level {
+    TRACE = 0;
+    DEBUG = 1;
+    INFO = 2;
+    WARN = 3;
+    ERROR = 4;
+  }
+  Level level = 1;
+}
+
+message ChangeLogLevelResponse { string remark = 1; }
+
+service Admin {
+  rpc ChangeLogLevel(ChangeLogLevelRequest) returns (ChangeLogLevelResponse) {}
+}
\ No newline at end of file
diff --git a/rust/proto/apache/rocketmq/v2/definition.proto b/rust/proto/apache/rocketmq/v2/definition.proto
new file mode 100644
index 0000000..86bb3dd
--- /dev/null
+++ b/rust/proto/apache/rocketmq/v2/definition.proto
@@ -0,0 +1,450 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto3";
+
+import "google/protobuf/timestamp.proto";
+import "google/protobuf/duration.proto";
+
+package apache.rocketmq.v2;
+
+option csharp_namespace = "Apache.Rocketmq.V2";
+option java_multiple_files = true;
+option java_package = "apache.rocketmq.v2";
+option java_generate_equals_and_hash = true;
+option java_string_check_utf8 = true;
+option java_outer_classname = "MQDomain";
+
+enum TransactionResolution {
+  TRANSACTION_RESOLUTION_UNSPECIFIED = 0;
+  COMMIT = 1;
+  ROLLBACK = 2;
+}
+
+enum TransactionSource {
+  SOURCE_UNSPECIFIED = 0;
+  SOURCE_CLIENT = 1;
+  SOURCE_SERVER_CHECK = 2;
+}
+
+enum Permission {
+  PERMISSION_UNSPECIFIED = 0;
+  NONE = 1;
+  READ = 2;
+  WRITE = 3;
+  READ_WRITE = 4;
+}
+
+enum FilterType {
+  FILTER_TYPE_UNSPECIFIED = 0;
+  TAG = 1;
+  SQL = 2;
+}
+
+message FilterExpression {
+  FilterType type = 1;
+  string expression = 2;
+}
+
+message RetryPolicy {
+  int32 max_attempts = 1;
+  oneof strategy {
+    ExponentialBackoff exponential_backoff = 2;
+    CustomizedBackoff customized_backoff = 3;
+  }
+}
+
+// https://en.wikipedia.org/wiki/Exponential_backoff
+message ExponentialBackoff {
+  google.protobuf.Duration initial = 1;
+  google.protobuf.Duration max = 2;
+  float multiplier = 3;
+}
+
+message CustomizedBackoff {
+  // To support classic backoff strategy which is arbitary defined by end users.
+  // Typical values are: `1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h`
+  repeated google.protobuf.Duration next = 1;
+}
+
+message Resource {
+  string resource_namespace = 1;
+
+  // Resource name identifier, which remains unique within the abstract resource
+  // namespace.
+  string name = 2;
+}
+
+message SubscriptionEntry {
+  Resource topic = 1;
+  FilterExpression expression = 2;
+}
+
+enum AddressScheme {
+  ADDRESS_SCHEME_UNSPECIFIED = 0;
+  IPv4 = 1;
+  IPv6 = 2;
+  DOMAIN_NAME = 3;
+}
+
+message Address {
+  string host = 1;
+  int32 port = 2;
+}
+
+message Endpoints {
+  AddressScheme scheme = 1;
+  repeated Address addresses = 2;
+}
+
+message Broker {
+  // Name of the broker
+  string name = 1;
+
+  // Broker index. Canonically, index = 0 implies that the broker is playing
+  // leader role while brokers with index > 0 play follower role.
+  int32 id = 2;
+
+  // Address of the broker, complying with the following scheme
+  // 1. dns:[//authority/]host[:port]
+  // 2. ipv4:address[:port][,address[:port],...] – IPv4 addresses
+  // 3. ipv6:address[:port][,address[:port],...] – IPv6 addresses
+  Endpoints endpoints = 3;
+}
+
+message MessageQueue {
+  Resource topic = 1;
+  int32 id = 2;
+  Permission permission = 3;
+  Broker broker = 4;
+  repeated MessageType accept_message_types = 5;
+}
+
+enum MessageType {
+  MESSAGE_TYPE_UNSPECIFIED = 0;
+
+  NORMAL = 1;
+
+  // Sequenced message
+  FIFO = 2;
+
+  // Messages that are delivered after the specified duration.
+  DELAY = 3;
+
+  // Messages that are transactional. Only committed messages are delivered to
+  // subscribers.
+  TRANSACTION = 4;
+}
+
+enum DigestType {
+  DIGEST_TYPE_UNSPECIFIED = 0;
+
+  // CRC algorithm achieves goal of detecting random data error with lowest
+  // computation overhead.
+  CRC32 = 1;
+
+  // MD5 algorithm achieves good balance between collision rate and computation
+  // overhead.
+  MD5 = 2;
+
+  // SHA-family has substantially fewer collision with fair amount of
+  // computation.
+  SHA1 = 3;
+}
+
+// When publishing messages to or subscribing messages from brokers, clients
+// shall include or validate digests of message body to ensure data integrity.
+//
+// For message publishing, when an invalid digest were detected, brokers need
+// respond client with BAD_REQUEST.
+//
+// For messages subscription, when an invalid digest were detected, consumers
+// need to handle this case according to message type:
+// 1) Standard messages should be negatively acknowledged instantly, causing
+// immediate re-delivery; 2) FIFO messages require special RPC, to re-fetch
+// previously acquired messages batch;
+//
+// Message consumption model also affects how invalid digest are handled. When
+// messages are consumed in broadcasting way,
+// TODO: define semantics of invalid-digest-when-broadcasting.
+message Digest {
+  DigestType type = 1;
+  string checksum = 2;
+}
+
+enum ClientType {
+  CLIENT_TYPE_UNSPECIFIED = 0;
+  PRODUCER = 1;
+  PUSH_CONSUMER = 2;
+  SIMPLE_CONSUMER = 3;
+}
+
+enum Encoding {
+  ENCODING_UNSPECIFIED = 0;
+
+  IDENTITY = 1;
+
+  GZIP = 2;
+}
+
+message SystemProperties {
+  // Tag, which is optional.
+  optional string tag = 1;
+
+  // Message keys
+  repeated string keys = 2;
+
+  // Message identifier, client-side generated, remains unique.
+  // if message_id is empty, the send message request will be aborted with
+  // status `INVALID_ARGUMENT`
+  string message_id = 3;
+
+  // Message body digest
+  Digest body_digest = 4;
+
+  // Message body encoding. Candidate options are identity, gzip, snappy etc.
+  Encoding body_encoding = 5;
+
+  // Message type, normal, FIFO or transactional.
+  MessageType message_type = 6;
+
+  // Message born time-point.
+  google.protobuf.Timestamp born_timestamp = 7;
+
+  // Message born host. Valid options are IPv4, IPv6 or client host domain name.
+  string born_host = 8;
+
+  // Time-point at which the message is stored in the broker, which is absent
+  // for message publishing.
+  optional google.protobuf.Timestamp store_timestamp = 9;
+
+  // The broker that stores this message. It may be broker name, IP or arbitrary
+  // identifier that uniquely identify the server.
+  string store_host = 10;
+
+  // Time-point at which broker delivers to clients, which is optional.
+  optional google.protobuf.Timestamp delivery_timestamp = 11;
+
+  // If a message is acquired by way of POP, this field holds the receipt,
+  // which is absent for message publishing.
+  // Clients use the receipt to acknowledge or negatively acknowledge the
+  // message.
+  optional string receipt_handle = 12;
+
+  // Message queue identifier in which a message is physically stored.
+  int32 queue_id = 13;
+
+  // Message-queue offset at which a message is stored, which is absent for
+  // message publishing.
+  optional int64 queue_offset = 14;
+
+  // Period of time servers would remain invisible once a message is acquired.
+  optional google.protobuf.Duration invisible_duration = 15;
+
+  // Business code may failed to process messages for the moment. Hence, clients
+  // may request servers to deliver them again using certain back-off strategy,
+  // the attempt is 1 not 0 if message is delivered first time, and it is absent
+  // for message publishing.
+  optional int32 delivery_attempt = 16;
+
+  // Define the group name of message in the same topic, which is optional.
+  optional string message_group = 17;
+
+  // Trace context for each message, which is optional.
+  optional string trace_context = 18;
+
+  // If a transactional message stay unresolved for more than
+  // `transaction_orphan_threshold`, it would be regarded as an
+  // orphan. Servers that manages orphan messages would pick up
+  // a capable publisher to resolve
+  optional google.protobuf.Duration orphaned_transaction_recovery_duration = 19;
+}
+
+message Message {
+  Resource topic = 1;
+
+  // User defined key-value pairs.
+  // If user_properties contain the reserved keys by RocketMQ,
+  // the send message request will be aborted with status `INVALID_ARGUMENT`.
+  // See below links for the reserved keys
+  // https://github.com/apache/rocketmq/blob/master/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java#L58
+  map<string, string> user_properties = 2;
+
+  SystemProperties system_properties = 3;
+
+  bytes body = 4;
+}
+
+message Assignment {
+  MessageQueue message_queue = 1;
+}
+
+enum Code {
+  CODE_UNSPECIFIED = 0;
+
+  // Generic code for success.
+  OK = 20000;
+
+  // Generic code for multiple return results.
+  MULTIPLE_RESULTS = 30000;
+
+  // Generic code for bad request, indicating that required fields or headers are missing.
+  BAD_REQUEST = 40000;
+  // Format of access point is illegal.
+  ILLEGAL_ACCESS_POINT = 40001;
+  // Format of topic is illegal.
+  ILLEGAL_TOPIC = 40002;
+  // Format of consumer group is illegal.
+  ILLEGAL_CONSUMER_GROUP = 40003;
+  // Format of message tag is illegal.
+  ILLEGAL_MESSAGE_TAG = 40004;
+  // Format of message key is illegal.
+  ILLEGAL_MESSAGE_KEY = 40005;
+  // Format of message group is illegal.
+  ILLEGAL_MESSAGE_GROUP = 40006;
+  // Format of message property key is illegal.
+  ILLEGAL_MESSAGE_PROPERTY_KEY = 40007;
+  // Transaction id is invalid.
+  INVALID_TRANSACTION_ID = 40008;
+  // Format of message id is illegal.
+  ILLEGAL_MESSAGE_ID = 40009;
+  // Format of filter expression is illegal.
+  ILLEGAL_FILTER_EXPRESSION = 40010;
+  // The invisible time of request is invalid.
+  ILLEGAL_INVISIBLE_TIME = 40011;
+  // The delivery timestamp of message is invalid.
+  ILLEGAL_DELIVERY_TIME = 40012;
+  // Receipt handle of message is invalid.
+  INVALID_RECEIPT_HANDLE = 40013;
+  // Message property conflicts with its type.
+  MESSAGE_PROPERTY_CONFLICT_WITH_TYPE = 40014;
+  // Client type could not be recognized.
+  UNRECOGNIZED_CLIENT_TYPE = 40015;
+  // Message is corrupted.
+  MESSAGE_CORRUPTED = 40016;
+  // Request is rejected due to missing of x-mq-client-id header.
+  CLIENT_ID_REQUIRED = 40017;
+
+  // Generic code indicates that the client request lacks valid authentication
+  // credentials for the requested resource.
+  UNAUTHORIZED = 40100;
+
+  // Generic code indicates that the account is suspended due to overdue of payment.
+  PAYMENT_REQUIRED = 40200;
+
+  // Generic code for the case that user does not have the permission to operate.
+  FORBIDDEN = 40300;
+
+  // Generic code for resource not found.
+  NOT_FOUND = 40400;
+  // Message not found from server.
+  MESSAGE_NOT_FOUND = 40401;
+  // Topic resource does not exist.
+  TOPIC_NOT_FOUND = 40402;
+  // Consumer group resource does not exist.
+  CONSUMER_GROUP_NOT_FOUND = 40403;
+
+  // Generic code representing client side timeout when connecting to, reading data from, or write data to server.
+  REQUEST_TIMEOUT = 40800;
+
+  // Generic code represents that the request entity is larger than limits defined by server.
+  PAYLOAD_TOO_LARGE = 41300;
+  // Message body size exceeds the threshold.
+  MESSAGE_BODY_TOO_LARGE = 41301;
+
+  // Generic code for use cases where pre-conditions are not met.
+  // For example, if a producer instance is used to publish messages without prior start() invocation,
+  // this error code will be raised.
+  PRECONDITION_FAILED = 42800;
+
+  // Generic code indicates that too many requests are made in short period of duration.
+  // Requests are throttled.
+  TOO_MANY_REQUESTS = 42900;
+
+  // Generic code for the case that the server is unwilling to process the request because its header fields are too large.
+  // The request may be resubmitted after reducing the size of the request header fields.
+  REQUEST_HEADER_FIELDS_TOO_LARGE = 43100;
+  // Message properties total size exceeds the threshold.
+  MESSAGE_PROPERTIES_TOO_LARGE = 43101;
+
+  // Generic code indicates that server/client encountered an unexpected
+  // condition that prevented it from fulfilling the request.
+  INTERNAL_ERROR = 50000;
+  // Code indicates that the server encountered an unexpected condition
+  // that prevented it from fulfilling the request.
+  // This error response is a generic "catch-all" response.
+  // Usually, this indicates the server cannot find a better alternative
+  // error code to response. Sometimes, server administrators log error
+  // responses like the 500 status code with more details about the request
+  // to prevent the error from happening again in the future.
+  //
+  // See https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/500
+  INTERNAL_SERVER_ERROR = 50001;
+  // The HA-mechanism is not working now.
+  HA_NOT_AVAILABLE = 50002;
+
+  // Generic code means that the server or client does not support the
+  // functionality required to fulfill the request.
+  NOT_IMPLEMENTED = 50100;
+
+  // Generic code represents that the server, which acts as a gateway or proxy,
+  // does not get an satisfied response in time from its upstream servers.
+  // See https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/504
+  PROXY_TIMEOUT = 50400;
+  // Message persistence timeout.
+  MASTER_PERSISTENCE_TIMEOUT = 50401;
+  // Slave persistence timeout.
+  SLAVE_PERSISTENCE_TIMEOUT = 50402;
+
+  // Generic code for unsupported operation.
+  UNSUPPORTED = 50500;
+  // Operation is not allowed in current version.
+  VERSION_UNSUPPORTED = 50501;
+  // Not allowed to verify message. Chances are that you are verifying
+  // a FIFO message, as is violating FIFO semantics.
+  VERIFY_FIFO_MESSAGE_UNSUPPORTED = 50502;
+
+  // Generic code for failed message consumption.
+  FAILED_TO_CONSUME_MESSAGE = 60000;
+}
+
+message Status {
+  Code code = 1;
+  string message = 2;
+}
+
+enum Language {
+  LANGUAGE_UNSPECIFIED = 0;
+  JAVA = 1;
+  CPP = 2;
+  DOT_NET = 3;
+  GOLANG = 4;
+  RUST = 5;
+}
+
+// User Agent
+message UA {
+  // SDK language
+  Language language = 1;
+
+  // SDK version
+  string version = 2;
+
+  // Platform details, including OS name, version, arch etc.
+  string platform = 3;
+
+  // Hostname of the node
+  string hostname = 4;
+}
\ No newline at end of file
diff --git a/rust/proto/apache/rocketmq/v2/service.proto b/rust/proto/apache/rocketmq/v2/service.proto
new file mode 100644
index 0000000..42bf332
--- /dev/null
+++ b/rust/proto/apache/rocketmq/v2/service.proto
@@ -0,0 +1,455 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto3";
+
+import "google/protobuf/duration.proto";
+import "google/protobuf/timestamp.proto";
+
+import "apache/rocketmq/v2/definition.proto";
+
+package apache.rocketmq.v2;
+
+option csharp_namespace = "Apache.Rocketmq.V2";
+option java_multiple_files = true;
+option java_package = "apache.rocketmq.v2";
+option java_generate_equals_and_hash = true;
+option java_string_check_utf8 = true;
+option java_outer_classname = "MQService";
+
+// Topics are destination of messages to publish to or subscribe from. Similar
+// to domain names, they will be addressable after resolution through the
+// provided access point.
+//
+// Access points are usually the addresses of name servers, which fulfill
+// service discovery, load-balancing and other auxiliary services. Name servers
+// receive periodic heartbeats from affiliate brokers and erase those which
+// failed to maintain alive status.
+//
+// Name servers answer queries of QueryRouteRequest, responding clients with
+// addressable message-queues, which they may directly publish messages to or
+// subscribe messages from.
+//
+// QueryRouteRequest shall include source endpoints, aka, configured
+// access-point, which annotates tenant-id, instance-id or other
+// vendor-specific settings. Purpose-built name servers may respond customized
+// results based on these particular requirements.
+message QueryRouteRequest {
+  Resource topic = 1;
+  Endpoints endpoints = 2;
+}
+
+message QueryRouteResponse {
+  Status status = 1;
+
+  repeated MessageQueue message_queues = 2;
+}
+
+message SendMessageRequest {
+  repeated Message messages = 1;
+}
+
+message SendResultEntry {
+  Status status = 1;
+  string message_id = 2;
+  string transaction_id = 3;
+  int64 offset = 4;
+}
+
+message SendMessageResponse {
+  Status status = 1;
+
+  // Some implementation may have partial failure issues. Client SDK developers are expected to inspect
+  // each entry for best certainty.
+  repeated SendResultEntry entries = 2;
+}
+
+message QueryAssignmentRequest {
+  Resource topic = 1;
+  Resource group = 2;
+  Endpoints endpoints = 3;
+}
+
+message QueryAssignmentResponse {
+  Status status = 1;
+  repeated Assignment assignments = 2;
+}
+
+message ReceiveMessageRequest {
+  Resource group = 1;
+  MessageQueue message_queue = 2;
+  FilterExpression filter_expression = 3;
+  int32 batch_size = 4;
+  // Required if client type is simple consumer.
+  optional google.protobuf.Duration invisible_duration = 5;
+  // For message auto renew and clean
+  bool auto_renew = 6;
+}
+
+message ReceiveMessageResponse {
+  oneof content {
+    Status status = 1;
+    Message message = 2;
+    // The timestamp that brokers start to deliver status line or message.
+    google.protobuf.Timestamp delivery_timestamp = 3;
+  }
+}
+
+message AckMessageEntry {
+  string message_id = 1;
+  string receipt_handle = 2;
+}
+
+message AckMessageRequest {
+  Resource group = 1;
+  Resource topic = 2;
+  repeated AckMessageEntry entries = 3;
+}
+
+message AckMessageResultEntry {
+  string message_id = 1;
+  string receipt_handle = 2;
+
+  // Acknowledge result may be acquired through inspecting
+  // `status.code`; In case acknowledgement failed, `status.message`
+  // is the explanation of the failure.
+  Status status = 3;
+}
+
+message AckMessageResponse {
+  // RPC tier status, which is used to represent RPC-level errors including
+  // authentication, authorization, throttling and other general failures.
+  Status status = 1;
+
+  repeated AckMessageResultEntry entries = 2;
+}
+
+message ForwardMessageToDeadLetterQueueRequest {
+  Resource group = 1;
+  Resource topic = 2;
+  string receipt_handle = 3;
+  string message_id = 4;
+  int32 delivery_attempt = 5;
+  int32 max_delivery_attempts = 6;
+}
+
+message ForwardMessageToDeadLetterQueueResponse {
+  Status status = 1;
+}
+
+message HeartbeatRequest {
+  optional Resource group = 1;
+  ClientType client_type = 2;
+}
+
+message HeartbeatResponse {
+  Status status = 1;
+}
+
+message EndTransactionRequest {
+  Resource topic = 1;
+  string message_id = 2;
+  string transaction_id = 3;
+  TransactionResolution resolution = 4;
+  TransactionSource source = 5;
+  string trace_context = 6;
+}
+
+message EndTransactionResponse {
+  Status status = 1;
+}
+
+message PrintThreadStackTraceCommand {
+  string nonce = 1;
+}
+
+message ThreadStackTrace {
+  string nonce = 1;
+  optional string thread_stack_trace = 2;
+}
+
+message VerifyMessageCommand {
+  string nonce = 1;
+  Message message = 2;
+}
+
+message VerifyMessageResult {
+  string nonce = 1;
+}
+
+message RecoverOrphanedTransactionCommand {
+  Message message = 1;
+  string transaction_id = 2;
+}
+
+message Publishing {
+  // Publishing settings below here is appointed by client, thus it is
+  // unnecessary for server to push at present.
+  //
+  // List of topics to which messages will publish to.
+  repeated Resource topics = 1;
+
+  // If the message body size exceeds `max_body_size`, broker servers would
+  // reject the request. As a result, it is advisable that Producer performs
+  // client-side check validation.
+  int32 max_body_size = 2;
+
+  // When `validate_message_type` flag set `false`, no need to validate message's type
+  // with messageQueue's `accept_message_types` before publising.
+  bool validate_message_type = 3;
+}
+
+message Subscription {
+  // Subscription settings below here is appointed by client, thus it is
+  // unnecessary for server to push at present.
+  //
+  // Consumer group.
+  optional Resource group = 1;
+
+  // Subscription for consumer.
+  repeated SubscriptionEntry subscriptions = 2;
+
+  // Subscription settings below here are from server, it is essential for
+  // server to push.
+  //
+  // When FIFO flag is `true`, messages of the same message group are processed
+  // in first-in-first-out manner.
+  //
+  // Brokers will not deliver further messages of the same group utill prior
+  // ones are completely acknowledged.
+  optional bool fifo = 3;
+
+  // Message receive batch size here is essential for push consumer.
+  optional int32 receive_batch_size = 4;
+
+  // Long-polling timeout for `ReceiveMessageRequest`, which is essential for
+  // push consumer.
+  optional google.protobuf.Duration long_polling_timeout = 5;
+}
+
+message Metric {
+  // Indicates that if client should export local metrics to server.
+  bool on = 1;
+
+  // The endpoint that client metrics should be exported to, which is required if the switch is on.
+  optional Endpoints endpoints = 2;
+}
+
+message Settings {
+  // Configurations for all clients.
+  optional ClientType client_type = 1;
+
+  optional Endpoints access_point = 2;
+
+  // If publishing of messages encounters throttling or server internal errors,
+  // publishers should implement automatic retries after progressive longer
+  // back-offs for consecutive errors.
+  //
+  // When processing message fails, `backoff_policy` describes an interval
+  // after which the message should be available to consume again.
+  //
+  // For FIFO messages, the interval should be relatively small because
+  // messages of the same message group would not be readily available utill
+  // the prior one depletes its lifecycle.
+  optional RetryPolicy backoff_policy = 3;
+
+  // Request timeout for RPCs excluding long-polling.
+  optional google.protobuf.Duration request_timeout = 4;
+
+  oneof pub_sub {
+    Publishing publishing = 5;
+
+    Subscription subscription = 6;
+  }
+
+  // User agent details
+  UA user_agent = 7;
+
+  Metric metric = 8;
+}
+
+message TelemetryCommand {
+  optional Status status = 1;
+
+  oneof command {
+    // Client settings
+    Settings settings = 2;
+
+    // These messages are from client.
+    //
+    // Report thread stack trace to server.
+    ThreadStackTrace thread_stack_trace = 3;
+
+    // Report message verify result to server.
+    VerifyMessageResult verify_message_result = 4;
+
+    // There messages are from server.
+    //
+    // Request client to recover the orphaned transaction message.
+    RecoverOrphanedTransactionCommand recover_orphaned_transaction_command = 5;
+
+    // Request client to print thread stack trace.
+    PrintThreadStackTraceCommand print_thread_stack_trace_command = 6;
+
+    // Request client to verify the consumption of the appointed message.
+    VerifyMessageCommand verify_message_command = 7;
+  }
+}
+
+message NotifyClientTerminationRequest {
+  // Consumer group, which is absent for producer.
+  optional Resource group = 1;
+}
+
+message NotifyClientTerminationResponse {
+  Status status = 1;
+}
+
+message ChangeInvisibleDurationRequest {
+  Resource group = 1;
+  Resource topic = 2;
+
+  // Unique receipt handle to identify message to change
+  string receipt_handle = 3;
+
+  // New invisible duration
+  google.protobuf.Duration invisible_duration = 4;
+
+  // For message tracing
+  string message_id = 5;
+}
+
+message ChangeInvisibleDurationResponse {
+  Status status = 1;
+
+  // Server may generate a new receipt handle for the message.
+  string receipt_handle = 2;
+}
+
+// For all the RPCs in MessagingService, the following error handling policies
+// apply:
+//
+// If the request doesn't bear a valid authentication credential, return a
+// response with common.status.code == `UNAUTHENTICATED`. If the authenticated
+// user is not granted with sufficient permission to execute the requested
+// operation, return a response with common.status.code == `PERMISSION_DENIED`.
+// If the per-user-resource-based quota is exhausted, return a response with
+// common.status.code == `RESOURCE_EXHAUSTED`. If any unexpected server-side
+// errors raise, return a response with common.status.code == `INTERNAL`.
+service MessagingService {
+  // Queries the route entries of the requested topic in the perspective of the
+  // given endpoints. On success, servers should return a collection of
+  // addressable message-queues. Note servers may return customized route
+  // entries based on endpoints provided.
+  //
+  // If the requested topic doesn't exist, returns `NOT_FOUND`.
+  // If the specific endpoints is empty, returns `INVALID_ARGUMENT`.
+  rpc QueryRoute(QueryRouteRequest) returns (QueryRouteResponse) {
+  }
+
+  // Producer or consumer sends HeartbeatRequest to servers periodically to
+  // keep-alive. Additionally, it also reports client-side configuration,
+  // including topic subscription, load-balancing group name, etc.
+  //
+  // Returns `OK` if success.
+  //
+  // If a client specifies a language that is not yet supported by servers,
+  // returns `INVALID_ARGUMENT`
+  rpc Heartbeat(HeartbeatRequest) returns (HeartbeatResponse) {
+  }
+
+  // Delivers messages to brokers.
+  // Clients may further:
+  // 1. Refine a message destination to message-queues which fulfills parts of
+  // FIFO semantic;
+  // 2. Flag a message as transactional, which keeps it invisible to consumers
+  // until it commits;
+  // 3. Time a message, making it invisible to consumers till specified
+  // time-point;
+  // 4. And more...
+  //
+  // Returns message-id or transaction-id with status `OK` on success.
+  //
+  // If the destination topic doesn't exist, returns `NOT_FOUND`.
+  rpc SendMessage(SendMessageRequest) returns (SendMessageResponse) {
+  }
+
+  // Queries the assigned route info of a topic for current consumer,
+  // the returned assignment result is decided by server-side load balancer.
+  //
+  // If the corresponding topic doesn't exist, returns `NOT_FOUND`.
+  // If the specific endpoints is empty, returns `INVALID_ARGUMENT`.
+  rpc QueryAssignment(QueryAssignmentRequest) returns (QueryAssignmentResponse) {
+  }
+
+  // Receives messages from the server in batch manner, returns a set of
+  // messages if success. The received messages should be acked or redelivered
+  // after processed.
+  //
+  // If the pending concurrent receive requests exceed the quota of the given
+  // consumer group, returns `UNAVAILABLE`. If the upstream store server hangs,
+  // return `DEADLINE_EXCEEDED` in a timely manner. If the corresponding topic
+  // or consumer group doesn't exist, returns `NOT_FOUND`. If there is no new
+  // message in the specific topic, returns `OK` with an empty message set.
+  // Please note that client may suffer from false empty responses.
+  //
+  // If failed to receive message from remote, server must return only one
+  // `ReceiveMessageResponse` as the reply to the request, whose `Status` indicates
+  // the specific reason of failure, otherwise, the reply is considered successful.
+  rpc ReceiveMessage(ReceiveMessageRequest) returns (stream ReceiveMessageResponse) {
+  }
+
+  // Acknowledges the message associated with the `receipt_handle` or `offset`
+  // in the `AckMessageRequest`, it means the message has been successfully
+  // processed. Returns `OK` if the message server remove the relevant message
+  // successfully.
+  //
+  // If the given receipt_handle is illegal or out of date, returns
+  // `INVALID_ARGUMENT`.
+  rpc AckMessage(AckMessageRequest) returns (AckMessageResponse) {
+  }
+
+  // Forwards one message to dead letter queue if the max delivery attempts is
+  // exceeded by this message at client-side, return `OK` if success.
+  rpc ForwardMessageToDeadLetterQueue(ForwardMessageToDeadLetterQueueRequest)
+      returns (ForwardMessageToDeadLetterQueueResponse) {
+  }
+
+  // Commits or rollback one transactional message.
+  rpc EndTransaction(EndTransactionRequest) returns (EndTransactionResponse) {
+  }
+
+  // Once a client starts, it would immediately establishes bi-lateral stream
+  // RPCs with brokers, reporting its settings as the initiative command.
+  //
+  // When servers have need of inspecting client status, they would issue
+  // telemetry commands to clients. After executing received instructions,
+  // clients shall report command execution results through client-side streams.
+  rpc Telemetry(stream TelemetryCommand) returns (stream TelemetryCommand) {
+  }
+
+  // Notify the server that the client is terminated.
+  rpc NotifyClientTermination(NotifyClientTerminationRequest) returns (NotifyClientTerminationResponse) {
+  }
+
+  // Once a message is retrieved from consume queue on behalf of the group, it
+  // will be kept invisible to other clients of the same group for a period of
+  // time. The message is supposed to be processed within the invisible
+  // duration. If the client, which is in charge of the invisible message, is
+  // not capable of processing the message timely, it may use
+  // ChangeInvisibleDuration to lengthen invisible duration.
+  rpc ChangeInvisibleDuration(ChangeInvisibleDurationRequest) returns (ChangeInvisibleDurationResponse) {
+  }
+}
\ No newline at end of file
diff --git a/rust/src/client.rs b/rust/src/client.rs
new file mode 100644
index 0000000..1a139f6
--- /dev/null
+++ b/rust/src/client.rs
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+use crate::command;
+use crate::error;
+use crate::pb::{self, QueryRouteRequest, Resource};
+use crate::{error::ClientError, pb::messaging_service_client::MessagingServiceClient};
+use parking_lot::Mutex;
+use slog::info;
+use slog::{debug, error, o, warn, Logger};
+use std::{
+    collections::HashMap,
+    sync::Arc,
+    sync::{atomic::AtomicUsize, Weak},
+};
+use tokio::sync::oneshot;
+use tonic::transport::{Certificate, Channel, ClientTlsConfig};
+
+#[derive(Debug, Clone)]
+struct Session {
+    stub: MessagingServiceClient<Channel>,
+    logger: Logger,
+}
+
+impl Session {
+    async fn new(endpoint: String, logger: &Logger) -> Result<Self, error::ClientError> {
+        debug!(logger, "Creating session to {}", endpoint);
+        let peer_addr = endpoint.clone();
+
+        let tls = ClientTlsConfig::default();
+
+        let channel = Channel::from_shared(endpoint)
+            .map_err(|e| {
+                error!(logger, "Failed to create channel. Cause: {:?}", e);
+                error::ClientError::Connect
+            })?
+            .tls_config(tls)
+            .map_err(|e| {
+                error!(logger, "Failed to configure TLS. Cause: {:?}", e);
+                error::ClientError::Connect
+            })?
+            .connect_timeout(std::time::Duration::from_secs(3))
+            .tcp_nodelay(true)
+            .connect()
+            .await
+            .map_err(|e| {
+                error!(logger, "Failed to connect to {}. Cause: {:?}", peer_addr, e);
+                error::ClientError::Connect
+            })?;
+
+        let stub = MessagingServiceClient::new(channel);
+
+        Ok(Session {
+            stub,
+            logger: logger.new(o!("peer" => peer_addr)),
+        })
+    }
+
+    async fn query_route(
+        &mut self,
+        request: tonic::Request<pb::QueryRouteRequest>,
+    ) -> Result<tonic::Response<pb::QueryRouteResponse>, error::ClientError> {
+        match self.stub.query_route(request).await {
+            Ok(response) => {
+                return Ok(response);
+            }
+            Err(e) => {
+                error!(self.logger, "QueryRoute failed. Cause: {:?}", e);
+                return Err(error::ClientError::ClientInternal);
+            }
+        }
+    }
+}
+
+#[derive(Debug)]
+struct SessionManager {
+    logger: Logger,
+    tx: tokio::sync::mpsc::Sender<command::Command>,
+}
+
+impl SessionManager {
+    fn new(logger: Logger) -> Self {
+        let (tx, mut rx) = tokio::sync::mpsc::channel(256);
+
+        let submitter_logger = logger.new(o!("component" => "submitter"));
+        tokio::spawn(async move {
+            let mut session_map: HashMap<String, Session> = HashMap::new();
+            loop {
+                match rx.recv().await {
+                    Some(command) => match command {
+                        command::Command::QueryRoute { peer, request, tx } => {
+                            if !session_map.contains_key(&peer) {
+                                match Session::new(peer.clone(), &submitter_logger).await {
+                                    Ok(session) => {
+                                        session_map.insert(peer.clone(), session);
+                                    }
+                                    Err(e) => {
+                                        error!(
+                                            submitter_logger,
+                                            "Failed to create session to {}. Cause: {:?}", peer, e
+                                        );
+                                        let _ = tx.send(Err(ClientError::Connect));
+                                        continue;
+                                    }
+                                }
+                            }
+
+                            match session_map.get(&peer) {
+                                Some(session) => {
+                                    // Cloning Channel is cheap and encouraged
+                                    // https://docs.rs/tonic/0.7.2/tonic/transport/struct.Channel.html#multiplexing-requests
+                                    let mut session = session.clone();
+                                    tokio::spawn(async move {
+                                        let result = session.query_route(request).await;
+                                        let _ = tx.send(result);
+                                    });
+                                }
+                                None => {}
+                            }
+                        }
+                    },
+                    None => {
+                        info!(submitter_logger, "Submit loop exit");
+                        break;
+                    }
+                }
+            }
+        });
+
+        SessionManager { logger, tx }
+    }
+
+    async fn route(
+        &self,
+        endpoint: &str,
+        topic: &str,
+        client: Weak<&Client>,
+    ) -> Result<Route, error::ClientError> {
+        let client = match client.upgrade() {
+            Some(client) => client,
+            None => {
+                return Err(error::ClientError::ClientInternal);
+            }
+        };
+
+        let request = QueryRouteRequest {
+            topic: Some(Resource {
+                name: topic.to_owned(),
+                resource_namespace: client.arn.clone(),
+            }),
+            endpoints: Some(client.access_point.clone()),
+        };
+
+        let mut request = tonic::Request::new(request);
+        client.sign(request.metadata_mut());
+
+        let (tx1, rx1) = oneshot::channel();
+        let command = command::Command::QueryRoute {
+            peer: endpoint.to_owned(),
+            request,
+            tx: tx1,
+        };
+
+        match self.tx.send(command).await {
+            Ok(_) => {}
+            Err(e) => {
+                error!(self.logger, "Failed to submit request");
+            }
+        }
+
+        match rx1.await {
+            Ok(result) => result.map(|_response| Route {}),
+            Err(e) => {
+                error!(self.logger, "oneshot channel error. Cause: {:?}", e);
+                Err(ClientError::ClientInternal)
+            }
+        }
+    }
+}
+
+#[derive(Debug)]
+struct Route {}
+
+#[derive(Debug)]
+enum RouteStatus {
+    Querying(Vec<oneshot::Sender<Result<Arc<Route>, error::ClientError>>>),
+    Found(Arc<Route>),
+}
+
+#[derive(Debug)]
+pub(crate) struct Client {
+    session_manager: SessionManager,
+    logger: Logger,
+    route_table: Mutex<HashMap<String /* topic */, RouteStatus>>,
+    arn: String,
+    id: String,
+    access_point: pb::Endpoints,
+}
+
+static CLIENT_ID_SEQUENCE: AtomicUsize = AtomicUsize::new(0);
+
+impl Client {
+    fn client_id() -> String {
+        let host = match hostname::get() {
+            Ok(name) => name,
+            Err(_) => "localhost".into(),
+        };
+
+        let host = match host.into_string() {
+            Ok(host) => host,
+            Err(_) => String::from("localhost"),
+        };
+
+        format!(
+            "{}@{}#{}",
+            host,
+            std::process::id(),
+            CLIENT_ID_SEQUENCE.fetch_add(1, std::sync::atomic::Ordering::Relaxed),
+        )
+    }
+
+    pub(crate) fn new(
+        logger: Logger,
+        access_url: impl std::net::ToSocketAddrs,
+    ) -> Result<Self, error::ClientError> {
+        let id = Self::client_id();
+        let mut access_point = pb::Endpoints {
+            scheme: pb::AddressScheme::IPv4 as i32,
+            addresses: vec![],
+        };
+
+        for socket_addr in access_url
+            .to_socket_addrs()
+            .map_err(|e| error::ClientError::ClientInternal)?
+        {
+            if socket_addr.is_ipv4() {
+                access_point.scheme = pb::AddressScheme::IPv4 as i32;
+            } else {
+                access_point.scheme = pb::AddressScheme::IPv6 as i32;
+            }
+
+            let addr = pb::Address {
+                host: socket_addr.ip().to_string(),
+                port: socket_addr.port() as i32,
+            };
+            access_point.addresses.push(addr);
+        }
+
+        Ok(Client {
+            session_manager: SessionManager::new(logger.new(o!("component" => "session_manager"))),
+            logger,
+            route_table: Mutex::new(HashMap::new()),
+            arn: String::from(""),
+            id,
+            access_point,
+        })
+    }
+
+    async fn query_route(
+        &self,
+        topic: &str,
+        lookup_cache: bool,
+    ) -> Result<Arc<Route>, error::ClientError> {
+        debug!(self.logger, "Query route for topic={}", topic);
+        let rx = match self
+            .route_table
+            .lock()
+            .entry(topic.to_owned())
+            .or_insert_with(|| RouteStatus::Querying(Vec::new()))
+        {
+            RouteStatus::Found(route) => {
+                if lookup_cache {
+                    return Ok(Arc::clone(route));
+                }
+                None
+            }
+            RouteStatus::Querying(ref mut v) => {
+                if v.is_empty() {
+                    None
+                } else {
+                    let (tx, rx) = oneshot::channel();
+                    v.push(tx);
+                    Some(rx)
+                }
+            }
+        };
+
+        if let Some(rx) = rx {
+            match rx.await {
+                Ok(route) => {
+                    return route;
+                }
+                Err(_e) => {
+                    return Err(error::ClientError::ClientInternal);
+                }
+            }
+        }
+
+        let client = Arc::new(*&self);
+        let client_weak = Arc::downgrade(&client);
+        let endpoint = "https://127.0.0.1:8081";
+        match self
+            .session_manager
+            .route(endpoint, topic, client_weak)
+            .await
+        {
+            Ok(route) => {
+                let route = Arc::new(route);
+                let prev = self
+                    .route_table
+                    .lock()
+                    .insert(topic.to_owned(), RouteStatus::Found(Arc::clone(&route)));
+
+                match prev {
+                    Some(RouteStatus::Found(_)) => {}
+                    Some(RouteStatus::Querying(mut v)) => {
+                        for item in v.drain(..) {
+                            let _ = item.send(Ok(Arc::clone(&route)));
+                        }
+                    }
+                    None => {}
+                };
+                return Ok(route);
+            }
+            Err(_e) => {
+                let prev = self.route_table.lock().remove(topic);
+                match prev {
+                    Some(RouteStatus::Found(route)) => {
+                        self.route_table
+                            .lock()
+                            .insert(topic.to_owned(), RouteStatus::Found(Arc::clone(&route)));
+                        return Ok(route);
+                    }
+                    Some(RouteStatus::Querying(mut v)) => {
+                        for tx in v.drain(..) {
+                            let _ = tx.send(Err(error::ClientError::ClientInternal));
+                        }
+                        return Err(error::ClientError::ClientInternal);
+                    }
+                    None => {
+                        return Err(error::ClientError::ClientInternal);
+                    }
+                };
+            }
+        }
+    }
+
+    fn sign(&self, metadata: &mut tonic::metadata::MetadataMap) {
+        let _ = tonic::metadata::AsciiMetadataValue::try_from(&self.id).and_then(|v| {
+            metadata.insert("x-mq-client-id", v);
+            Ok(())
+        });
+
+        metadata.insert(
+            "x-mq-language",
+            tonic::metadata::AsciiMetadataValue::from_static("RUST"),
+        );
+        metadata.insert(
+            "x-mq-client-version",
+            tonic::metadata::AsciiMetadataValue::from_static("5.0.0"),
+        );
+        metadata.insert(
+            "x-mq-protocol-version",
+            tonic::metadata::AsciiMetadataValue::from_static("2.0.0"),
+        );
+    }
+}
+
+#[cfg(test)]
+mod tests {
+
+    use super::*;
+    use slog::Drain;
+
+    #[test]
+    fn test_client_id() {
+        let mut set = std::collections::HashSet::new();
+        (0..256).for_each(|_| {
+            let id = Client::client_id();
+            assert_eq!(false, set.contains(&id));
+            set.insert(id);
+        });
+    }
+
+    #[tokio::test]
+    async fn test_session_manager_new() {
+        let _session_manager = SessionManager::new(create_logger());
+        drop(_session_manager);
+    }
+
+    fn create_logger() -> Logger {
+        let decorator = slog_term::TermDecorator::new().build();
+        let drain = slog_term::FullFormat::new(decorator).build().fuse();
+        let drain = slog_async::Async::new(drain).build().fuse();
+        slog::Logger::root(drain, o!())
+    }
+}
diff --git a/rust/src/command.rs b/rust/src/command.rs
new file mode 100644
index 0000000..5a3f3fa
--- /dev/null
+++ b/rust/src/command.rs
@@ -0,0 +1,13 @@
+use crate::client::Client;
+use crate::error::ClientError;
+use crate::pb::{QueryRouteRequest, QueryRouteResponse};
+use tokio::sync::oneshot;
+use tonic::{Request, Response};
+
+pub(crate) enum Command {
+    QueryRoute {
+        peer: String,
+        request: Request<QueryRouteRequest>,
+        tx: oneshot::Sender<Result<Response<QueryRouteResponse>, ClientError>>,
+    },
+}
diff --git a/rust/src/error.rs b/rust/src/error.rs
new file mode 100644
index 0000000..f31f5d4
--- /dev/null
+++ b/rust/src/error.rs
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+use thiserror::Error;
+
+#[derive(Error, Debug)]
+pub enum ClientError {
+    #[error("Failed to create session")]
+    Connect,
+
+    #[error("Client internal error")]
+    ClientInternal,
+}
diff --git a/rust/src/lib.rs b/rust/src/lib.rs
new file mode 100644
index 0000000..d974146
--- /dev/null
+++ b/rust/src/lib.rs
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#[path = "pb/apache.rocketmq.v2.rs"]
+mod pb;
+
+mod error;
+
+mod command;
+
+mod client;
+
+mod producer;
diff --git a/rust/src/main.rs b/rust/src/main.rs
new file mode 100644
index 0000000..d69ca78
--- /dev/null
+++ b/rust/src/main.rs
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+fn main() {
+    println!("Hello, world!");
+}
diff --git a/rust/src/pb/README.md b/rust/src/pb/README.md
new file mode 100644
index 0000000..e198bf0
--- /dev/null
+++ b/rust/src/pb/README.md
@@ -0,0 +1,5 @@
+## Note
+--------------------------------------------------------
+Files in this directory are generated tonic_build crate. 
+
+**DO NOT EDIT THEM!**
\ No newline at end of file
diff --git a/rust/src/producer.rs b/rust/src/producer.rs
new file mode 100644
index 0000000..685eb82
--- /dev/null
+++ b/rust/src/producer.rs
@@ -0,0 +1,25 @@
+use slog::Logger;
+
+use crate::{client, error};
+
+struct Producer {
+    client: client::Client,
+}
+
+impl Producer {
+    pub async fn new<T>(logger: Logger, topics: T) -> Result<Self, error::ClientError>
+    where
+        T: IntoIterator,
+        T::Item: AsRef<str>,
+    {
+        let access_point = "localhost:8081";
+        let client = client::Client::new(logger, access_point)?;
+        for _topic in topics.into_iter() {
+            // client.subscribe(topic.as_ref()).await;
+        }
+
+        Ok(Producer { client })
+    }
+
+    pub fn start(&mut self) {}
+}