You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/03/24 08:54:46 UTC

[GitHub] [pulsar] cbornet commented on a change in pull request #6512: WIP: [protocol] Grpc protocol handler

cbornet commented on a change in pull request #6512: WIP: [protocol] Grpc protocol handler
URL: https://github.com/apache/pulsar/pull/6512#discussion_r396989609
 
 

 ##########
 File path: pulsar-protocols/grpc/src/main/proto/PulsarApi.proto
 ##########
 @@ -0,0 +1,888 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+syntax = "proto2";
+
+package pulsar.protocols.grpc;
+option java_package = "org.apache.pulsar.protocols.grpc.api";
+option java_multiple_files = true;
+// option optimize_for = LITE_RUNTIME;
+
+service Pulsar {
+    rpc produce(stream CommandSend) returns (stream SendResult) {}
+    // rpc consume(stream BaseCommand) returns (stream BaseCommand) {}
+    rpc get_schema(CommandGetSchema) returns (CommandGetSchemaResponse) {}
+    rpc lookup_topic(CommandLookupTopic) returns (CommandLookupTopicResponse) {}
+}
+
+message Schema {
+    enum Type {
+        None = 0;
+        String = 1;
+        Json = 2;
+        Protobuf = 3;
+        Avro = 4;
+        Bool = 5;
+        Int8 = 6;
+        Int16 = 7;
+        Int32 = 8;
+        Int64 = 9;
+        Float = 10;
+        Double = 11;
+        Date = 12;
+        Time = 13;
+        Timestamp = 14;
+        KeyValue = 15;
+    }
+
+    required string name = 1;
+    required bytes schema_data = 3;
+    required Type type = 4;
+    map<string, string> properties = 5;
+
+}
+
+message MessageIdData {
+    required uint64 ledgerId = 1;
+    required uint64 entryId  = 2;
+    optional int32 partition = 3 [default = -1];
+    optional int32 batch_index = 4 [default = -1];
+}
+
+message KeyValue {
+    required string key = 1;
+    required string value = 2;
+}
+
+message KeyLongValue {
+    required string key = 1;
+    required uint64 value = 2;
+}
+
+message IntRange {
+    required int32 start = 1;
+    required int32 end = 2;
+}
+
+message EncryptionKeys {
+    required string key = 1;
+    required bytes value = 2;
+    repeated KeyValue metadata = 3;
+}
+
+enum CompressionType {
+    NONE   = 0;
+    LZ4    = 1;
+    ZLIB   = 2;
+    ZSTD   = 3;
+    SNAPPY   = 4;
+}
+
+message SingleMessageMetadata {
+    repeated KeyValue properties    = 1;
+    optional string partition_key   = 2;
+    required int32 payload_size    = 3;
+    optional bool compacted_out     = 4 [default = false];
+
+    // the timestamp that this event occurs. it is typically set by applications.
+    // if this field is omitted, `publish_time` can be used for the purpose of `event_time`.
+    optional uint64 event_time = 5 [default = 0];
+    optional bool partition_key_b64_encoded = 6 [ default = false ];
+    // Specific a key to overwrite the message key which used for ordering dispatch in Key_Shared mode.
+    optional bytes ordering_key = 7;
+    // Allows consumer retrieve the sequence id that the producer set.
+    optional uint64 sequence_id = 8;
+}
+
+enum ServerError {
+    UnknownError        = 0;
+    MetadataError       = 1; // Error with ZK/metadata
+    PersistenceError    = 2; // Error writing reading from BK
+    AuthenticationError = 3; // Non valid authentication
+    AuthorizationError  = 4; // Not authorized to use resource
+
+    ConsumerBusy        = 5; // Unable to subscribe/unsubscribe because
+                             // other consumers are connected
+    ServiceNotReady     = 6; // Any error that requires client retry operation with a fresh lookup
+    ProducerBlockedQuotaExceededError = 7; // Unable to create producer because backlog quota exceeded
+    ProducerBlockedQuotaExceededException = 8; // Exception while creating producer because quota exceeded
+    ChecksumError = 9; // Error while verifying message checksum
+    UnsupportedVersionError = 10; // Error when an older client/version doesn't support a required feature
+    TopicNotFound = 11; // Topic not found
+    SubscriptionNotFound = 12; // Subscription not found
+    ConsumerNotFound = 13; // Consumer not found
+    TooManyRequests = 14; // Error with too many simultaneously request
+    TopicTerminatedError = 15; // The topic has been terminated
+
+    ProducerBusy         = 16; // Producer with same name is already connected
+    InvalidTopicName = 17; // The topic name is not valid
+
+    IncompatibleSchema = 18; // Specified schema was incompatible with topic schema
+    ConsumerAssignError = 19; // Dispatcher assign consumer error
+
+    TransactionCoordinatorNotFound = 20; // Transaction coordinator not found error
+    InvalidTxnStatus = 21; // Invalid txn status error
+}
+
+enum AuthMethod {
+    AuthMethodNone   = 0;
+    AuthMethodYcaV1  = 1;
+    AuthMethodAthens = 2;
+}
+
+// Each protocol version identify new features that are
+// incrementally added to the protocol
+enum ProtocolVersion {
+    v0 = 0;  // Initial versioning
+    v1 = 1;  // Added application keep-alive
+    v2 = 2;  // Added RedeliverUnacknowledgedMessages Command
+    v3 = 3;  // Added compression with LZ4 and ZLib
+    v4 = 4;  // Added batch message support
+    v5 = 5;  // Added disconnect client w/o closing connection
+    v6 = 6;  // Added checksum computation for metadata + payload
+    v7 = 7;  // Added CommandLookupTopic - Binary Lookup
+    v8 = 8;  // Added CommandConsumerStats - Client fetches broker side consumer stats
+    v9 = 9;  // Added end of topic notification
+    v10 = 10;// Added proxy to broker
+    v11 = 11;// C++ consumers before this version are not correctly handling the checksum field
+    v12 = 12;// Added get topic's last messageId from broker
+             // Added CommandActiveConsumerChange
+             // Added CommandGetTopicsOfNamespace
+    v13 = 13; // Schema-registry : added avro schema format for json
+    v14 = 14; // Add CommandAuthChallenge and CommandAuthResponse for mutual auth
+              // Added Key_Shared subscription
+    v15 = 15; // Add CommandGetOrCreateSchema and CommandGetOrCreateSchemaResponse
+}
+
+message CommandConnect {
+    optional string auth_method_name = 1;
+    optional bytes auth_data = 2;
+
+    // Client can ask to be proxyied to a specific broker
+    // This is only honored by a Pulsar proxy
+    optional string proxy_to_broker_url = 3;
+
+    // Original principal that was verified by
+    // a Pulsar proxy. In this case the auth info above
+    // will be the auth of the proxy itself
+    optional string original_principal = 7;
+
+    // Original auth role and auth Method that was passed
+    // to the proxy. In this case the auth info above
+    // will be the auth of the proxy itself
+    optional string original_auth_data = 8;
+    optional string original_auth_method = 9;
+
+}
+
+message CommandAuth {
+    optional string auth_method = 1;
+    optional bytes auth_data    = 2;
+}
+
+message CommandConnected {
+    required string server_version  = 1;
+    optional int32 protocol_version = 2 [default = 0];
+    optional int32 max_message_size = 3;
+}
+
+message CommandAuthResponse {
+    optional string client_version  = 1;
+    optional AuthData response      = 2;
+    optional int32 protocol_version = 3 [default = 0];
+}
+
+message CommandAuthChallenge {
+    optional string server_version  = 1;
+    optional AuthData challenge     = 2;
+    optional int32 protocol_version = 3 [default = 0];
+}
+
+// To support mutual authentication type, such as Sasl, reuse this command to mutual auth.
+message AuthData {
+    optional string auth_method_name = 1;
+    optional int64 auth_state_id     = 2;
+    optional bytes auth_data         = 3;
+}
+
+message AuthRoleTokenInfo {
+    required string role      = 1;
+    optional int64 expires    = 2 [default = -1];
+    optional int64 session_id = 3;
+}
+
+message AuthRoleToken {
+    required AuthRoleTokenInfo roleInfo  = 1;
+    required bytes signature         = 2;
+}
+
+enum KeySharedMode {
+    AUTO_SPLIT = 0;
+    STICKY = 1;
+}
+
+message KeySharedMeta {
+    required KeySharedMode keySharedMode = 1;
+    repeated IntRange hashRanges         = 3;
+}
+
+message CommandSubscribe {
+    enum SubType {
+        Exclusive  = 0;
+        Shared     = 1;
+        Failover   = 2;
+        Key_Shared = 3;
+    }
+    required string topic         = 1;
+    required string subscription  = 2;
+    required SubType subType      = 3;
+
+    required uint64 consumer_id   = 4;
+    required uint64 request_id    = 5;
+    optional string consumer_name = 6;
+    optional int32 priority_level = 7;
+
+    // Signal wether the subscription should be backed by a
+    // durable cursor or not
+    optional bool durable         = 8 [default = true];
+    
+    // If specified, the subscription will position the cursor
+    // markd-delete position  on the particular message id and
+    // will send messages from that point
+    optional MessageIdData start_message_id = 9;
+
+    /// Add optional metadata key=value to this consumer
+    repeated KeyValue metadata    = 10;
+
+    optional bool read_compacted  = 11;
+
+    optional Schema schema        = 12;
+    enum InitialPosition {
+        Latest   = 0;
+        Earliest = 1;
+    }
+    // Signal whether the subscription will initialize on latest
+    // or not -- earliest
+    optional InitialPosition initialPosition = 13 [default = Latest];
+
+    // Mark the subscription as "replicated". Pulsar will make sure
+    // to periodically sync the state of replicated subscriptions
+    // across different clusters (when using geo-replication).
+    optional bool replicate_subscription_state = 14;
+    
+    // If true, the subscribe operation will cause a topic to be 
+    // created if it does not exist already (and if topic auto-creation
+    // is allowed by broker.
+    // If false, the subscribe operation will fail if the topic 
+    // does not exist.
+    optional bool force_topic_creation = 15 [default = true];
+
+    // If specified, the subscription will reset cursor's position back 
+    // to specified seconds and  will send messages from that point
+    optional uint64 start_message_rollback_duration_sec = 16 [default = 0];
+
+    optional KeySharedMeta keySharedMeta = 17;
+}
+
+message CommandPartitionedTopicMetadata {
+    required string topic                = 1;
+    required uint64 request_id           = 2;
+    // TODO - Remove original_principal, original_auth_data, original_auth_method
+    // Original principal that was verified by
+    // a Pulsar proxy.
+    optional string original_principal   = 3;
+
+    // Original auth role and auth Method that was passed
+    // to the proxy.
+    optional string original_auth_data   = 4;
+    optional string original_auth_method = 5;
+}
+
+message CommandPartitionedTopicMetadataResponse {
+    enum LookupType {
+        Success  = 0;
+        Failed   = 1;
+    }
+    optional uint32 partitions            = 1;  // Optional in case of error
+    required uint64 request_id            = 2;
+    optional LookupType response          = 3;
+    optional ServerError error            = 4;
+    optional string message               = 5;
+}
+
+message CommandLookupTopic {
+    required string topic            = 1;
+    optional bool authoritative      = 2 [default = false];
+}
+
+message CommandLookupTopicResponse {
+    enum LookupType {
+        Redirect = 0;
+        Connect  = 1;
+        Failed   = 2;
+    }
+
+    required string grpcServiceHost         = 1;
+    optional int32 grpcServicePort          = 2;
+    optional int32 grpcServicePortTls       = 3;
+    required LookupType response            = 4;
+    required bool authoritative             = 5 [default = false];
+
+    // If it's true, indicates to the client that it must
+    // always connect through the service url after the
+    // lookup has been completed.
+    required bool proxy_through_service_url = 6 [default = false];
+}
+
+/// Create a new Producer on a topic, assigning the given producer_id,
+/// all messages sent with this producer_id will be persisted on the topic
+message CommandProducer {
+    required string topic                     = 1;
+
+    /// If a producer name is specified, the name will be used,
+    /// otherwise the broker will generate a unique name
+    optional string producer_name             = 2;
+
+    optional bool encrypted                   = 3 [default = false];
+
+    /// Add optional metadata key=value to this producer
+    map<string, string> metadata              = 4;
+
+    optional Schema schema                    = 5;
+
+    // If producer reconnect to broker, the epoch of this producer will +1
+    optional uint64 epoch                     = 6 [default = 0];
+
+    // Indicate the name of the producer is generated or user provided
+    // Use default true here is in order to be forward compatible with the client
+    optional bool user_provided_producer_name = 7 [default = true];
+}
+
+message CommandSend {
+    required uint64 sequence_id = 2;
+    optional int32 num_messages = 3 [default = 1];
+    optional uint64 txnid_least_bits = 4 [default = 0];
+    optional uint64 txnid_most_bits = 5 [default = 0];
+
+    /// Add highest sequence id to support batch message with external sequence id
+    optional uint64 highest_sequence_id = 6 [default = 0];
+
+    // Opaque payload and headers
+    required bytes headers_and_payload = 7;
+}
+
+message CommandSendReceipt {
+    required uint64 sequence_id         = 1;
+    optional MessageIdData message_id   = 2;
+    optional uint64 highest_sequence_id = 3 [default = 0];
+}
+
+message CommandSendError {
+    required uint64 sequence_id = 1;
+    required ServerError error  = 2;
+    required string message     = 3;
+}
+
+message SendResult {
+    oneof send_result_oneof {
+        CommandProducerSuccess producer_success = 1;
+        CommandSendReceipt send_receipt         = 2;
+        CommandSendError send_error             = 3;
+    }
+}
+
+message CommandMessage {
+    required uint64 consumer_id       = 1;
+    required MessageIdData message_id = 2;
+    optional uint32 redelivery_count  = 3 [default = 0];
+}
+
+message CommandAck {
+    enum AckType {
+        Individual = 0;
+        Cumulative = 1;
+    }
+
+    required uint64 consumer_id       = 1;
+    required AckType ack_type         = 2;
+
+    // In case of individual acks, the client can pass a list of message ids
+    repeated MessageIdData message_id = 3;
+
+    // Acks can contain a flag to indicate the consumer
+    // received an invalid message that got discarded
+    // before being passed on to the application.
+    enum ValidationError {
+        UncompressedSizeCorruption = 0;
+        DecompressionError = 1;
+        ChecksumMismatch = 2;
+        BatchDeSerializeError = 3;
+        DecryptionError = 4;
+    }
+
+    optional ValidationError validation_error = 4;
+    repeated KeyLongValue properties = 5;
+
+    optional uint64 txnid_least_bits = 6 [default = 0];
+    optional uint64 txnid_most_bits = 7 [default = 0];
+}
+
+message CommandAckResponse {
+    required uint64 consumer_id = 1;
+    optional uint64 txnid_least_bits  = 2 [default = 0];
+    optional uint64 txnid_most_bits = 3 [default = 0];
+    optional ServerError error = 4;
+    optional string message = 5;
+}
+
+// changes on active consumer
+message CommandActiveConsumerChange {
+        required uint64 consumer_id    = 1;
+        optional bool is_active     = 2 [default = false];
+}
+
+message CommandFlow {
+    required uint64 consumer_id       = 1;
+
+    // Max number of messages to prefetch, in addition
+    // of any number previously specified
+    required uint32 messagePermits     = 2;
+}
+
+message CommandUnsubscribe {
+    required uint64 consumer_id = 1;
+    required uint64 request_id  = 2;
+}
+
+// Reset an existing consumer to a particular message id
+message CommandSeek {
+    required uint64 consumer_id = 1;
+    required uint64 request_id  = 2;
+
+    optional MessageIdData message_id = 3;
+    optional uint64 message_publish_time = 4;
+}
+
+// Message sent by broker to client when a topic
+// has been forcefully terminated and there are no more
+// messages left to consume
+message CommandReachedEndOfTopic {
+    required uint64 consumer_id = 1;
+}
+
+message CommandCloseProducer {
+    required uint64 producer_id = 1;
+    required uint64 request_id = 2;
+}
+
+message CommandCloseConsumer {
+    required uint64 consumer_id = 1;
+    required uint64 request_id = 2;
+}
+
+message CommandRedeliverUnacknowledgedMessages {
+    required uint64 consumer_id = 1;
+    repeated MessageIdData message_ids = 2;
+}
+
+message CommandSuccess {
+    required uint64 request_id = 1;
+    optional Schema schema = 2;
+}
+
+message CommandProducerSuccess {
+    required string producer_name    = 1;
+
+    // The last sequence id that was stored by this producer in the previous session
+    // This will only be meaningful if deduplication has been enabled.
+    optional int64  last_sequence_id = 2 [default = -1];
+    optional bytes schema_version    = 3;
+}
+
+message CommandError {
+    required ServerError error = 1;
+    required string message    = 2;
+}
+
+// Commands to probe the state of connection.
+// When either client or broker doesn't receive commands for certain
+// amount of time, they will send a Ping probe.
+message CommandPing {
+}
+message CommandPong {
+}
+
+message CommandConsumerStats {
+        required uint64 request_id         = 1;
+        // required string topic_name         = 2;
+        // required string subscription_name  = 3;
+        required uint64 consumer_id        = 4;
+}
+
+message CommandConsumerStatsResponse {
+        required uint64 request_id              = 1;
+        optional ServerError error_code         = 2;
+        optional string error_message           = 3;
+
+        /// Total rate of messages delivered to the consumer. msg/s
+        optional double msgRateOut                  = 4;
+
+        /// Total throughput delivered to the consumer. bytes/s
+        optional double msgThroughputOut            = 5;
+
+        /// Total rate of messages redelivered by this consumer. msg/s
+        optional double msgRateRedeliver            = 6;
+
+        /// Name of the consumer
+        optional string consumerName                = 7;
+
+        /// Number of available message permits for the consumer
+        optional uint64 availablePermits            = 8;
+
+        /// Number of unacknowledged messages for the consumer
+        optional uint64 unackedMessages             = 9;
+
+        /// Flag to verify if consumer is blocked due to reaching threshold of unacked messages
+        optional bool blockedConsumerOnUnackedMsgs  = 10;
+
+        /// Address of this consumer
+        optional string address                     = 11;
+
+        /// Timestamp of connection
+        optional string connectedSince              = 12;
+
+        /// Whether this subscription is Exclusive or Shared or Failover
+        optional string type                        = 13;
+
+        /// Total rate of messages expired on this subscription. msg/s
+        optional double msgRateExpired              = 14;
+
+        /// Number of messages in the subscription backlog
+        optional uint64 msgBacklog                  = 15;
+}
+
+message CommandGetLastMessageId {
+    required uint64 consumer_id = 1;
+    required uint64 request_id  = 2;
+}
+
+message CommandGetLastMessageIdResponse {
+    required MessageIdData last_message_id = 1;
+    required uint64 request_id  = 2;
+}
+
+message CommandGetTopicsOfNamespace {
+    enum Mode {
+        PERSISTENT = 0;
+        NON_PERSISTENT = 1;
+        ALL = 2;
+    }
+    required uint64 request_id    = 1;
+    required string namespace    = 2;
+    optional Mode mode = 3 [default = PERSISTENT];
+}
+
+message CommandGetTopicsOfNamespaceResponse {
+    required uint64 request_id    = 1;
+    repeated string topics         = 2;
+}
+
+message CommandGetSchema {
+    required string topic         = 1;
+    optional bytes schema_version = 2;
+}
+
+message CommandGetSchemaResponse {
+    optional Schema schema          = 1;
+    optional bytes schema_version   = 2;
+}
+
+message CommandGetOrCreateSchema {
+    required uint64 request_id = 1;
+    required string topic      = 2;
+    required Schema schema     = 3;
+}
+
+message CommandGetOrCreateSchemaResponse {
+    required uint64 request_id      = 1;
+    optional ServerError error_code = 2;
+    optional string error_message   = 3;
+
+    optional bytes schema_version   = 4;
+}
+
+/// --- transaction related ---
+
+enum TxnAction {
+    COMMIT = 0;
+    ABORT = 1;
+}
+
+message CommandNewTxn {
+    required uint64 request_id = 1;
+    optional uint64 txn_ttl_seconds = 2 [default = 0];
+    optional uint64 tc_id = 3 [default = 0];
+}
+
+message CommandNewTxnResponse {
+    required uint64 request_id = 1;
+    optional uint64 txnid_least_bits = 2 [default = 0];
+    optional uint64 txnid_most_bits = 3 [default = 0];
+    optional ServerError error = 4;
+    optional string message = 5;
+}
+
+message CommandAddPartitionToTxn {
+    required uint64 request_id = 1;
+    optional uint64 txnid_least_bits = 2 [default = 0];
+    optional uint64 txnid_most_bits = 3 [default = 0];
+    repeated string partitions = 4;
+}
+
+message CommandAddPartitionToTxnResponse {
+    required uint64 request_id = 1;
+    optional uint64 txnid_least_bits = 2 [default = 0];
+    optional uint64 txnid_most_bits = 3 [default = 0];
+    optional ServerError error = 4;
+    optional string message = 5;
+}
+
+message Subscription {
+    required string topic = 1;
+    required string subscription = 2;
+}
+message CommandAddSubscriptionToTxn {
+    required uint64 request_id = 1;
+    optional uint64 txnid_least_bits = 2 [default = 0];
+    optional uint64 txnid_most_bits = 3 [default = 0];
+    repeated Subscription subscription = 4;
+}
+
+message CommandAddSubscriptionToTxnResponse {
+    required uint64 request_id = 1;
+    optional uint64 txnid_least_bits = 2 [default = 0];
+    optional uint64 txnid_most_bits = 3 [default = 0];
+    optional ServerError error = 4;
+    optional string message = 5;
+}
+
+message CommandEndTxn {
+    required uint64 request_id = 1;
+    optional uint64 txnid_least_bits = 2 [default = 0];
+    optional uint64 txnid_most_bits = 3 [default = 0];
+    optional TxnAction txn_action = 4;
+}
+
+message CommandEndTxnResponse {
+    required uint64 request_id = 1;
+    optional uint64 txnid_least_bits = 2 [default = 0];
+    optional uint64 txnid_most_bits = 3 [default = 0];
+    optional ServerError error = 4;
+    optional string message = 5;
+}
+
+message CommandEndTxnOnPartition {
+    required uint64 request_id = 1;
+    optional uint64 txnid_least_bits = 2 [default = 0];
+    optional uint64 txnid_most_bits = 3 [default = 0];
+    optional string topic = 4;
+    optional TxnAction txn_action = 5;
+}
+
+message CommandEndTxnOnPartitionResponse {
+    required uint64 request_id = 1;
+    optional uint64 txnid_least_bits = 2 [default = 0];
+    optional uint64 txnid_most_bits = 3 [default = 0];
+    optional ServerError error = 4;
+    optional string message = 5;
+}
+
+message CommandEndTxnOnSubscription {
+    required uint64 request_id = 1;
+    optional uint64 txnid_least_bits = 2 [default = 0];
+    optional uint64 txnid_most_bits = 3 [default = 0];
+    optional Subscription subscription= 4;
+    optional TxnAction txn_action = 5;
+}
+
+message CommandEndTxnOnSubscriptionResponse {
+    required uint64 request_id = 1;
+    optional uint64 txnid_least_bits = 2 [default = 0];
+    optional uint64 txnid_most_bits = 3 [default = 0];
+    optional ServerError error = 4;
+    optional string message = 5;
+}
+
+message BaseCommand {
 
 Review comment:
   BaseCommand is not used by the gRPC API. It's unused inherited definition from the PulsarApi and can be cleaned up.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services