You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/14 16:33:39 UTC
[pulsar-client-go] 02/38: Initial import
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
commit 84cb32ea75fbc322d107995620681e0d6feff202
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Wed Mar 27 09:34:18 2019 -0700
Initial import
---
.gitignore | 1 +
pulsar/client.go | 111 +
pulsar/consumer.go | 179 ++
pulsar/error.go | 80 +
pulsar/impl/batch_builder.go | 23 +
pulsar/impl/buffer.go | 119 ++
pulsar/impl/buffer_test.go | 19 +
pulsar/impl/closable.go | 5 +
pulsar/impl/commands.go | 33 +
pulsar/impl/connection.go | 293 +++
pulsar/impl/connection_pool.go | 54 +
pulsar/impl/connection_reader.go | 117 +
pulsar/impl/default_router.go | 26 +
pulsar/impl/lookup_service.go | 62 +
pulsar/impl/rpc_client.go | 71 +
pulsar/impl/topic_name.go | 88 +
pulsar/impl/topic_name_test.go | 68 +
pulsar/impl_client.go | 98 +
pulsar/impl_partition_producer.go | 88 +
pulsar/impl_producer.go | 123 ++
pulsar/message.go | 90 +
pulsar/producer.go | 187 ++
pulsar/pulsar_proto/PulsarApi.pb.go | 4043 +++++++++++++++++++++++++++++++++++
pulsar/reader.go | 84 +
24 files changed, 6062 insertions(+)
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..723ef36
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1 @@
+.idea
\ No newline at end of file
diff --git a/pulsar/client.go b/pulsar/client.go
new file mode 100644
index 0000000..21db637
--- /dev/null
+++ b/pulsar/client.go
@@ -0,0 +1,111 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package pulsar
+
+import (
+ "time"
+)
+
+func NewClient(options ClientOptions) (Client, error) {
+ return newClient(options)
+}
+
+// Opaque interface that represents the authentication credentials
+type Authentication interface {}
+
+// Create new Authentication provider with specified auth token
+func NewAuthenticationToken(token string) Authentication {
+ // TODO: return newAuthenticationToken(token)
+ return nil
+}
+
+// Create new Authentication provider with specified auth token supplier
+func NewAuthenticationTokenSupplier(tokenSupplier func() string) Authentication {
+ // TODO: return newAuthenticationTokenSupplier(tokenSupplier)
+ return nil
+}
+
+// Create new Authentication provider with specified TLS certificate and private key
+func NewAuthenticationTLS(certificatePath string, privateKeyPath string) Authentication {
+ // TODO: return newAuthenticationTLS(certificatePath, privateKeyPath)
+ return nil
+}
+
+// Create new Athenz Authentication provider with configuration in JSON form
+func NewAuthenticationAthenz(authParams string) Authentication {
+ // TODO: return newAuthenticationAthenz(authParams)
+ return nil
+}
+
+// Builder interface that is used to construct a Pulsar Client instance.
+type ClientOptions struct {
+ // Configure the service URL for the Pulsar service.
+ // This parameter is required
+ URL string
+
+ ConnectionTimeout time.Duration
+
+ // Set the operation timeout (default: 30 seconds)
+ // Producer-create, subscribe and unsubscribe operations will be retried until this interval, after which the
+ // operation will be marked as failed
+ OperationTimeout time.Duration
+
+ // Configure the authentication provider. (default: no authentication)
+ // Example: `Authentication: NewAuthenticationTLS("my-cert.pem", "my-key.pem")`
+ Authentication
+
+ // Set the path to the trusted TLS certificate file
+ TLSTrustCertsFilePath string
+
+ // Configure whether the Pulsar client accept untrusted TLS certificate from broker (default: false)
+ TLSAllowInsecureConnection bool
+
+ // Configure whether the Pulsar client verify the validity of the host name from broker (default: false)
+ TLSValidateHostname bool
+}
+
+type Client interface {
+ // Create the producer instance
+ // This method will block until the producer is created successfully
+ CreateProducer(ProducerOptions) (Producer, error)
+
+ // Create a `Consumer` by subscribing to a topic.
+ //
+ // If the subscription does not exist, a new subscription will be created and all messages published after the
+ // creation will be retained until acknowledged, even if the consumer is not connected
+ Subscribe(ConsumerOptions) (Consumer, error)
+
+ // Create a Reader instance.
+ // This method will block until the reader is created successfully.
+ CreateReader(ReaderOptions) (Reader, error)
+
+ // Fetch the list of partitions for a given topic
+ //
+ // If the topic is partitioned, this will return a list of partition names.
+ // If the topic is not partitioned, the returned list will contain the topic
+ // name itself.
+ //
+ // This can be used to discover the partitions and create {@link Reader},
+ // {@link Consumer} or {@link Producer} instances directly on a particular partition.
+ TopicPartitions(topic string) ([]string, error)
+
+ // Close the Client and free associated resources
+ Close() error
+}
diff --git a/pulsar/consumer.go b/pulsar/consumer.go
new file mode 100644
index 0000000..b32e5fc
--- /dev/null
+++ b/pulsar/consumer.go
@@ -0,0 +1,179 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package pulsar
+
+import (
+ "context"
+ "time"
+)
+
+// Pair of a Consumer and Message
+type ConsumerMessage struct {
+ Consumer
+ Message
+}
+
+// Types of subscription supported by Pulsar
+type SubscriptionType int
+
+const (
+ // There can be only 1 consumer on the same topic with the same subscription name
+ Exclusive SubscriptionType = iota
+
+ // Multiple consumer will be able to use the same subscription name and the messages will be dispatched according to
+ // a round-robin rotation between the connected consumers
+ Shared
+
+ // Multiple consumer will be able to use the same subscription name but only 1 consumer will receive the messages.
+ // If that consumer disconnects, one of the other connected consumers will start receiving messages.
+ Failover
+)
+
+type InitialPosition int
+
+const (
+ // Latest position which means the start consuming position will be the last message
+ Latest InitialPosition = iota
+
+ // Earliest position which means the start consuming position will be the first message
+ Earliest
+)
+
+// ConsumerBuilder is used to configure and create instances of Consumer
+type ConsumerOptions struct {
+ // Specify the topic this consumer will subscribe on.
+ // Either a topic, a list of topics or a topics pattern are required when subscribing
+ Topic string
+
+ // Specify a list of topics this consumer will subscribe on.
+ // Either a topic, a list of topics or a topics pattern are required when subscribing
+ Topics []string
+
+ // Specify a regular expression to subscribe to multiple topics under the same namespace.
+ // Either a topic, a list of topics or a topics pattern are required when subscribing
+ TopicsPattern string
+
+ // Specify the subscription name for this consumer
+ // This argument is required when subscribing
+ SubscriptionName string
+
+ // Attach a set of application defined properties to the consumer
+ // This properties will be visible in the topic stats
+ Properties map[string]string
+
+ // Set the timeout for unacked messages
+ // Message not acknowledged within the give time, will be replayed by the broker to the same or a different consumer
+ // Default is 0, which means message are not being replayed based on ack time
+ AckTimeout time.Duration
+
+ // Select the subscription type to be used when subscribing to the topic.
+ // Default is `Exclusive`
+ Type SubscriptionType
+
+ // InitialPosition at which the cursor will be set when subscribe
+ // Default is `Latest`
+ SubscriptionInitPos InitialPosition
+
+ // Sets a `MessageChannel` for the consumer
+ // When a message is received, it will be pushed to the channel for consumption
+ MessageChannel chan ConsumerMessage
+
+ // Sets the size of the consumer receive queue.
+ // The consumer receive queue controls how many messages can be accumulated by the `Consumer` before the
+ // application calls `Consumer.receive()`. Using a higher value could potentially increase the consumer
+ // throughput at the expense of bigger memory utilization.
+ // Default value is `1000` messages and should be good for most use cases.
+ // Set to -1 to disable prefetching in consumer
+ ReceiverQueueSize int
+
+ // Set the max total receiver queue size across partitions.
+ // This setting will be used to reduce the receiver queue size for individual partitions
+ // ReceiverQueueSize(int) if the total exceeds this value (default: 50000).
+ MaxTotalReceiverQueueSizeAcrossPartitions int
+
+ // Set the consumer name.
+ Name string
+
+ // If enabled, the consumer will read messages from the compacted topic rather than reading the full message backlog
+ // of the topic. This means that, if the topic has been compacted, the consumer will only see the latest value for
+ // each key in the topic, up until the point in the topic message backlog that has been compacted. Beyond that
+ // point, the messages will be sent as normal.
+ //
+ // ReadCompacted can only be enabled subscriptions to persistent topics, which have a single active consumer (i.e.
+ // failure or exclusive subscriptions). Attempting to enable it on subscriptions to a non-persistent topics or on a
+ // shared subscription, will lead to the subscription call throwing a PulsarClientException.
+ ReadCompacted bool
+}
+
+// An interface that abstracts behavior of Pulsar's consumer
+type Consumer interface {
+ // Get the topic for the consumer
+ Topic() string
+
+ // Get a subscription for the consumer
+ Subscription() string
+
+ // Unsubscribe the consumer
+ Unsubscribe() error
+
+ // Receives a single message.
+ // This calls blocks until a message is available.
+ Receive(context.Context) (Message, error)
+
+ // Ack the consumption of a single message
+ Ack(Message) error
+
+ // Ack the consumption of a single message, identified by its MessageID
+ AckID(MessageID) error
+
+ // Ack the reception of all the messages in the stream up to (and including) the provided message.
+ // This method will block until the acknowledge has been sent to the broker. After that, the messages will not be
+ // re-delivered to this consumer.
+ //
+ // Cumulative acknowledge cannot be used when the consumer type is set to ConsumerShared.
+ //
+ // It's equivalent to calling asyncAcknowledgeCumulative(Message) and waiting for the callback to be triggered.
+ AckCumulative(Message) error
+
+ // Ack the reception of all the messages in the stream up to (and including) the provided message.
+ // This method will block until the acknowledge has been sent to the broker. After that, the messages will not be
+ // re-delivered to this consumer.
+ //
+ // Cumulative acknowledge cannot be used when the consumer type is set to ConsumerShared.
+ //
+ // It's equivalent to calling asyncAcknowledgeCumulative(MessageID) and waiting for the callback to be triggered.
+ AckCumulativeID(MessageID) error
+
+ // Close the consumer and stop the broker to push more messages
+ Close() error
+
+ // Reset the subscription associated with this consumer to a specific message id.
+ // The message id can either be a specific message or represent the first or last messages in the topic.
+ //
+ // Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the
+ // seek() on the individual partitions.
+ Seek(msgID MessageID) error
+
+ // Redelivers all the unacknowledged messages. In Failover mode, the request is ignored if the consumer is not
+ // active for the given topic. In Shared mode, the consumers messages to be redelivered are distributed across all
+ // the connected consumers. This is a non blocking call and doesn't throw an exception. In case the connection
+ // breaks, the messages are redelivered after reconnect.
+ RedeliverUnackedMessages()
+}
diff --git a/pulsar/error.go b/pulsar/error.go
new file mode 100644
index 0000000..929b250
--- /dev/null
+++ b/pulsar/error.go
@@ -0,0 +1,80 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package pulsar
+import "C"
+import "fmt"
+
+type Result int
+
+const (
+ UnknownError Result = 1 // Unknown error happened on broker
+ InvalidConfiguration Result = 2 // Invalid configuration
+ TimeoutError Result = 3 // Operation timed out
+ LookupError Result = 4 // Broker lookup failed
+ ConnectError Result = 5 // Failed to connect to broker
+ ReadError Result = 6 // Failed to read from socket
+ AuthenticationError Result = 7 // Authentication failed on broker
+ AuthorizationError Result = 8 // Client is not authorized to create producer/consumer
+ ErrorGettingAuthenticationData Result = 9 // Client cannot find authorization data
+ BrokerMetadataError Result = 10 // Broker failed in updating metadata
+ BrokerPersistenceError Result = 11 // Broker failed to persist entry
+ ChecksumError Result = 12 // Corrupt message checksum failure
+ ConsumerBusy Result = 13 // Exclusive consumer is already connected
+ NotConnectedError Result = 14 // Producer/Consumer is not currently connected to broker
+ AlreadyClosedError Result = 15 // Producer/Consumer is already closed and not accepting any operation
+ InvalidMessage Result = 16 // Error in publishing an already used message
+ ConsumerNotInitialized Result = 17 // Consumer is not initialized
+ ProducerNotInitialized Result = 18 // Producer is not initialized
+ TooManyLookupRequestException Result = 19 // Too Many concurrent LookupRequest
+ InvalidTopicName Result = 20 // Invalid topic name
+ InvalidUrl Result = 21 // Client Initialized with Invalid Broker Url (VIP Url passed to Client Constructor)
+ ServiceUnitNotReady Result = 22 // Service Unit unloaded between client did lookup and producer/consumer got created
+ OperationNotSupported Result = 23
+ ProducerBlockedQuotaExceededError Result = 24 // Producer is blocked
+ ProducerBlockedQuotaExceededException Result = 25 // Producer is getting exception
+ ProducerQueueIsFull Result = 26 // Producer queue is full
+ MessageTooBig Result = 27 // Trying to send a messages exceeding the max size
+ TopicNotFound Result = 28 // Topic not found
+ SubscriptionNotFound Result = 29 // Subscription not found
+ ConsumerNotFound Result = 30 // Consumer not found
+ UnsupportedVersionError Result = 31 // Error when an older client/version doesn't support a required feature
+ TopicTerminated Result = 32 // Topic was already terminated
+ CryptoError Result = 33 // Error when crypto operation fails
+)
+
+type Error struct {
+ msg string
+ result Result
+}
+
+func (e *Error) Result() Result {
+ return e.result
+}
+
+func (e *Error) Error() string {
+ return e.msg
+}
+
+func newError(result Result, msg string) error {
+ return &Error{
+ msg: fmt.Sprintf("%s: %d", msg, result),
+ result: result,
+ }
+}
\ No newline at end of file
diff --git a/pulsar/impl/batch_builder.go b/pulsar/impl/batch_builder.go
new file mode 100644
index 0000000..7eb2030
--- /dev/null
+++ b/pulsar/impl/batch_builder.go
@@ -0,0 +1,23 @@
+package impl
+
+type BatchBuilder struct {
+ buffer Buffer
+}
+
+func NewBatchBuilder() *BatchBuilder {
+ return &BatchBuilder{
+ buffer: NewBuffer(4096),
+ }
+}
+
+func (bb *BatchBuilder) isFull() bool {
+ return false
+}
+
+func (bb *BatchBuilder) hasSpace(size int) bool {
+ return false
+}
+
+func (bb *BatchBuilder) Flush() []byte {
+ return nil
+}
diff --git a/pulsar/impl/buffer.go b/pulsar/impl/buffer.go
new file mode 100644
index 0000000..8e81f0c
--- /dev/null
+++ b/pulsar/impl/buffer.go
@@ -0,0 +1,119 @@
+package impl
+
+import "encoding/binary"
+
+type Buffer interface {
+ ReadableBytes() uint32
+
+ WritableBytes() uint32
+
+ Capacity() uint32
+
+ IsWritable() bool
+
+ Read(size uint32) []byte
+
+ ReadableSlice() []byte
+
+ WritableSlice() []byte
+
+ // Advance the writer index when data was written in a slice
+ WrittenBytes(size uint32)
+
+ // Copy the available portion of data at the beginning of the buffer
+ MoveToFront()
+
+ ReadUint32() uint32
+
+ WriteUint32(n uint32)
+
+ Write(s []byte)
+
+ Resize(newSize uint32)
+
+ Clear()
+}
+
+type buffer struct {
+ data []byte
+
+ readerIdx uint32
+ writerIdx uint32
+}
+
+func NewBuffer(size int) Buffer {
+ return &buffer{
+ data: make([]byte, size),
+ readerIdx: 0,
+ writerIdx: 0,
+ }
+}
+
+func (b *buffer) ReadableBytes() uint32 {
+ return b.writerIdx - b.readerIdx
+}
+
+func (b *buffer) WritableBytes() uint32 {
+ return uint32(cap(b.data)) - b.writerIdx
+}
+
+func (b *buffer) Capacity() uint32 {
+ return uint32(cap(b.data))
+}
+
+func (b *buffer) IsWritable() bool {
+ return b.WritableBytes() > 0
+}
+
+func (b *buffer) Read(size uint32) []byte {
+ res := b.data[b.readerIdx : b.readerIdx+size]
+ b.readerIdx += size
+ return res
+}
+
+func (b *buffer) ReadableSlice() []byte {
+ return b.data[b.readerIdx:b.writerIdx]
+}
+
+func (b *buffer) WritableSlice() []byte {
+ return b.data[b.writerIdx:]
+}
+
+func (b *buffer) WrittenBytes(size uint32) {
+ b.writerIdx += size
+}
+
+func (b *buffer) MoveToFront() {
+ size := b.ReadableBytes()
+ copy(b.data, b.Read(size))
+ b.readerIdx = 0
+ b.writerIdx = size
+}
+
+func (b *buffer) Resize(newSize uint32) {
+ newData := make([]byte, newSize)
+ size := b.ReadableBytes()
+ copy(newData, b.Read(size))
+ b.data = newData
+ b.readerIdx = 0
+ b.writerIdx = size
+}
+
+func (b *buffer) ReadUint32() uint32 {
+ return binary.BigEndian.Uint32(b.Read(4))
+}
+
+func (b *buffer) WriteUint32(n uint32) {
+ binary.BigEndian.PutUint32(b.WritableSlice(), n)
+ b.writerIdx += 4
+}
+
+func (b *buffer) Write(s []byte) {
+ copy(b.WritableSlice(), s)
+ b.writerIdx += uint32(len(s))
+}
+
+func (b *buffer) Clear() {
+ b.readerIdx = 0
+ b.writerIdx = 0
+}
diff --git a/pulsar/impl/buffer_test.go b/pulsar/impl/buffer_test.go
new file mode 100644
index 0000000..1d72196
--- /dev/null
+++ b/pulsar/impl/buffer_test.go
@@ -0,0 +1,19 @@
+package impl
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestBuffer(t *testing.T) {
+ b := NewBuffer(1024)
+ assert.Equal(t, uint32(0), b.ReadableBytes())
+ assert.Equal(t, uint32(1024), b.WritableBytes())
+ assert.Equal(t, uint32(1024), b.Capacity())
+
+ b.Write([]byte("hello"))
+ assert.Equal(t, uint32(5), b.ReadableBytes())
+ assert.Equal(t, uint32(1019), b.WritableBytes())
+ assert.Equal(t, uint32(1024), b.Capacity())
+}
diff --git a/pulsar/impl/closable.go b/pulsar/impl/closable.go
new file mode 100644
index 0000000..c483901
--- /dev/null
+++ b/pulsar/impl/closable.go
@@ -0,0 +1,5 @@
+package impl
+
+type Closable interface {
+ Close() error
+}
diff --git a/pulsar/impl/commands.go b/pulsar/impl/commands.go
new file mode 100644
index 0000000..90760e1
--- /dev/null
+++ b/pulsar/impl/commands.go
@@ -0,0 +1,33 @@
+package impl
+
+import (
+ "github.com/golang/protobuf/proto"
+ log "github.com/sirupsen/logrus"
+ pb "pulsar-client-go-native/pulsar/pulsar_proto"
+)
+
+const MaxFrameSize = 5 * 1024 * 1024
+
+func baseCommand(cmdType pb.BaseCommand_Type, msg proto.Message) *pb.BaseCommand {
+ cmd := &pb.BaseCommand{
+ Type: &cmdType,
+ }
+ switch cmdType {
+ case pb.BaseCommand_CONNECT:
+ cmd.Connect = msg.(*pb.CommandConnect)
+ break
+ case pb.BaseCommand_LOOKUP:
+ cmd.LookupTopic = msg.(*pb.CommandLookupTopic)
+ break
+
+ case pb.BaseCommand_PARTITIONED_METADATA:
+ cmd.PartitionMetadata = msg.(*pb.CommandPartitionedTopicMetadata)
+ break
+
+ default:
+ log.Panic("Missing command type: ", cmdType)
+ }
+
+ return cmd
+}
+
diff --git a/pulsar/impl/connection.go b/pulsar/impl/connection.go
new file mode 100644
index 0000000..6379f6a
--- /dev/null
+++ b/pulsar/impl/connection.go
@@ -0,0 +1,293 @@
+package impl
+
+import (
+ "errors"
+ "github.com/golang/protobuf/proto"
+ log "github.com/sirupsen/logrus"
+ "net"
+ pb "pulsar-client-go-native/pulsar/pulsar_proto"
+ "sync"
+ "sync/atomic"
+)
+
+type Connection interface {
+ SendRequest(requestId uint64, req *pb.BaseCommand, callback func(command *pb.BaseCommand))
+ Close()
+}
+
+type connectionState int
+
+const (
+ connectionInit connectionState = iota
+ connectionConnecting
+ connectionTcpConnected
+ connectionReady
+ connectionClosed
+)
+
+type request struct {
+ id uint64
+ cmd *pb.BaseCommand
+ callback func(command *pb.BaseCommand)
+}
+
+type connection struct {
+ sync.Mutex
+ cond *sync.Cond
+ state connectionState
+
+ logicalAddr string
+ physicalAddr string
+ cnx net.Conn
+
+ writeBuffer Buffer
+ reader *connectionReader
+
+ log *log.Entry
+
+ requestIdGenerator uint64
+
+ incomingRequests chan *request
+ pendingReqs map[uint64]*request
+}
+
+func newConnection(logicalAddr string, physicalAddr string) *connection {
+ cnx := &connection{
+ state: connectionInit,
+ logicalAddr: logicalAddr,
+ physicalAddr: physicalAddr,
+ writeBuffer: NewBuffer(4096),
+ log: log.WithField("raddr", physicalAddr),
+ pendingReqs: make(map[uint64]*request),
+ }
+ cnx.reader = newConnectionReader(cnx)
+ cnx.cond = sync.NewCond(cnx)
+ return cnx
+}
+
+func (c *connection) start() {
+ // Each connection gets its own goroutine that will
+ go func() {
+ if c.connect() {
+ if c.doHandshake() {
+ c.run()
+ } else {
+ c.changeState(connectionClosed)
+ }
+ } else {
+ c.changeState(connectionClosed)
+ }
+ }()
+}
+
+func (c *connection) connect() (ok bool) {
+ c.log.Info("Connecting to broker")
+
+ var err error
+ c.cnx, err = net.Dial("tcp", c.physicalAddr)
+ if err != nil {
+ c.log.WithError(err).Warn("Failed to connect to broker.")
+ c.internalClose()
+ return false
+ } else {
+ c.log = c.log.WithField("laddr", c.cnx.LocalAddr())
+ c.log.Info("TCP connection established")
+ c.state = connectionTcpConnected
+ return true
+ }
+}
+
+func (c *connection) doHandshake() (ok bool) {
+ // Send 'Connect' command to initiate handshake
+ version := int32(pb.ProtocolVersion_v13)
+ c.writeCommand(baseCommand(pb.BaseCommand_CONNECT, &pb.CommandConnect{
+ ProtocolVersion: &version,
+ ClientVersion: proto.String("Pulsar Go 0.1"),
+ // AuthMethodName: "token",
+ // AuthData: authData,
+ }))
+
+ cmd, _, err := c.reader.readSingleCommand()
+ if err != nil {
+ c.log.WithError(err).Warn("Failed to perform initial handshake")
+ return false
+ }
+
+ if cmd.Connected == nil {
+ c.log.Warnf("Failed to perform initial handshake - Expecting 'Connected' cmd, got '%s'",
+ cmd.Type)
+ return false
+ }
+
+ c.log.Info("Connection is ready")
+ c.changeState(connectionReady)
+ return true
+}
+
+func (c *connection) waitUntilReady() error {
+ c.Lock()
+ defer c.Unlock()
+
+ for {
+ switch c.state {
+ case connectionInit:
+ case connectionConnecting:
+ case connectionTcpConnected:
+ // Wait for the state to change
+ c.cond.Wait()
+ break
+
+ case connectionReady:
+ return nil
+
+ case connectionClosed:
+ return errors.New("connection error")
+ }
+ }
+}
+
+func (c *connection) run() {
+ // All reads come from the reader goroutine
+ go c.reader.readFromConnection()
+
+ for {
+ select {
+ case req := <-c.incomingRequests:
+ c.pendingReqs[req.id] = req
+ c.writeCommand(req.cmd)
+ }
+ }
+}
+
+func (c *connection) writeCommand(cmd proto.Message) {
+ // Wire format
+ // [FRAME_SIZE] [CMD_SIZE][CMD]
+ cmdSize := uint32(proto.Size(cmd))
+ frameSize := cmdSize + 4
+ bufferSize := frameSize + 4
+
+ c.writeBuffer.Clear()
+ if c.writeBuffer.WritableBytes() < bufferSize {
+ c.writeBuffer.Resize(c.writeBuffer.Capacity() * 2)
+ }
+
+ c.writeBuffer.WriteUint32(frameSize)
+ c.writeBuffer.WriteUint32(cmdSize)
+ serialized, err := proto.Marshal(cmd)
+ if err != nil {
+ c.log.WithError(err).Fatal("Protobuf serialization error")
+ }
+
+ c.writeBuffer.Write(serialized)
+
+ if _, err := c.cnx.Write(c.writeBuffer.ReadableSlice()); err != nil {
+ c.log.WithError(err).Warn("Failed to write on connection")
+ c.internalClose()
+ }
+}
+
+func (c *connection) receivedCommand(cmd *pb.BaseCommand, headersAndPayload []byte) {
+ c.log.Infof("Received command: %s -- payload: %v", cmd, headersAndPayload)
+
+ switch *cmd.Type {
+ case pb.BaseCommand_SUCCESS:
+ c.handleResponse(*cmd.Success.RequestId, cmd)
+ break
+
+ case pb.BaseCommand_PRODUCER_SUCCESS:
+ c.handleResponse(*cmd.ProducerSuccess.RequestId, cmd)
+ break
+
+ case pb.BaseCommand_PARTITIONED_METADATA_RESPONSE:
+ c.handleResponse(*cmd.PartitionMetadataResponse.RequestId, cmd)
+ break
+
+ case pb.BaseCommand_LOOKUP_RESPONSE:
+ c.handleResponse(*cmd.LookupTopicResponse.RequestId, cmd)
+ break
+
+ case pb.BaseCommand_CONSUMER_STATS_RESPONSE:
+ c.handleResponse(*cmd.ConsumerStatsResponse.RequestId, cmd)
+ break
+
+ case pb.BaseCommand_GET_LAST_MESSAGE_ID_RESPONSE:
+ c.handleResponse(*cmd.GetLastMessageIdResponse.RequestId, cmd)
+ break
+
+ case pb.BaseCommand_GET_TOPICS_OF_NAMESPACE_RESPONSE:
+ c.handleResponse(*cmd.GetTopicsOfNamespaceResponse.RequestId, cmd)
+ break
+
+ case pb.BaseCommand_GET_SCHEMA_RESPONSE:
+ c.handleResponse(*cmd.GetSchemaResponse.RequestId, cmd)
+ break
+
+ case pb.BaseCommand_ERROR:
+ break
+
+ case pb.BaseCommand_CLOSE_PRODUCER:
+ case pb.BaseCommand_CLOSE_CONSUMER:
+
+ case pb.BaseCommand_SEND_RECEIPT:
+ break
+ case pb.BaseCommand_SEND_ERROR:
+ break
+
+ case pb.BaseCommand_MESSAGE:
+ break
+
+ case pb.BaseCommand_PONG:
+
+ case pb.BaseCommand_ACTIVE_CONSUMER_CHANGE:
+ // TODO
+ break
+
+ default:
+ c.log.Errorf("Received invalid command type: %s", cmd.Type)
+ c.Close()
+ }
+}
+
+func (c *connection) SendRequest(requestId uint64, req *pb.BaseCommand, callback func(command *pb.BaseCommand)) {
+ c.pendingReqs[requestId] = &request{
+ id: requestId,
+ cmd: req,
+ callback: callback,
+ }
+ c.writeCommand(req)
+}
+
+func (c *connection) handleResponse(requestId uint64, response *pb.BaseCommand) {
+ request, ok := c.pendingReqs[requestId]
+ if !ok {
+ c.log.Warnf("Received unexpected response for request %d of type %s", requestId, response.Type)
+ return
+ }
+
+ delete(c.pendingReqs, requestId)
+ request.callback(response)
+}
+
+func (c *connection) Close() {
+ // TODO
+}
+
+func (c *connection) changeState(state connectionState) {
+ c.Lock()
+ c.state = state
+ c.cond.Broadcast()
+ c.Unlock()
+}
+
+func (c *connection) newRequestId() uint64 {
+ return atomic.AddUint64(&c.requestIdGenerator, 1)
+}
+
+func (c *connection) internalClose() {
+ c.state = connectionClosed
+ c.cond.Broadcast()
+
+ if c.cnx != nil {
+ c.cnx.Close()
+ }
+}
diff --git a/pulsar/impl/connection_pool.go b/pulsar/impl/connection_pool.go
new file mode 100644
index 0000000..676f205
--- /dev/null
+++ b/pulsar/impl/connection_pool.go
@@ -0,0 +1,54 @@
+package impl
+
+import (
+ "sync"
+)
+
+type ConnectionPool interface {
+ GetConnection(logicalAddr string, physicalAddr string) (Connection, error)
+
+ // Close all the connections in the pool
+ Close()
+}
+
+type connectionPool struct {
+ pool sync.Map
+}
+
+func NewConnectionPool() ConnectionPool {
+ return &connectionPool{}
+}
+
+func (p *connectionPool) GetConnection(logicalAddr string, physicalAddr string) (Connection, error) {
+ cachedCnx, found := p.pool.Load(logicalAddr)
+ if found {
+ cnx := cachedCnx.(*connection)
+ if err := cnx.waitUntilReady(); err != nil {
+ // Connection is ready to be used
+ return cnx, nil
+ } else {
+ // The cached connection is failed
+ p.pool.Delete(logicalAddr)
+ }
+ }
+
+ // Try to create a new connection
+ newCnx, wasCached := p.pool.LoadOrStore(logicalAddr, newConnection(logicalAddr, physicalAddr))
+ cnx := newCnx.(*connection)
+ if !wasCached {
+ cnx.start()
+ }
+
+ if err := cnx.waitUntilReady(); err != nil {
+ return nil, err
+ } else {
+ return cnx, nil
+ }
+}
+
+func (p *connectionPool) Close() {
+ p.pool.Range(func(key, value interface{}) bool {
+ value.(Connection).Close()
+ return true
+ })
+}
diff --git a/pulsar/impl/connection_reader.go b/pulsar/impl/connection_reader.go
new file mode 100644
index 0000000..5bb87c6
--- /dev/null
+++ b/pulsar/impl/connection_reader.go
@@ -0,0 +1,117 @@
+package impl
+
+import (
+ "bufio"
+ "github.com/golang/protobuf/proto"
+ "github.com/pkg/errors"
+ "io"
+ pb "pulsar-client-go-native/pulsar/pulsar_proto"
+)
+
+type connectionReader struct {
+ cnx *connection
+ buffer Buffer
+ reader *bufio.Reader
+}
+
+func newConnectionReader(cnx *connection) *connectionReader {
+ return &connectionReader{
+ cnx: cnx,
+ reader: bufio.NewReader(cnx.cnx),
+ buffer: NewBuffer(4096),
+ }
+}
+
+func (r *connectionReader) readFromConnection() {
+ for {
+ cmd, headersAndPayload, err := r.readSingleCommand()
+ if err != nil {
+ r.cnx.log.WithError(err).Info("Error reading from connection")
+ r.cnx.internalClose()
+ break
+ }
+
+ // Process
+ r.cnx.log.Debug("Got command! ", cmd, " with payload ", headersAndPayload)
+ r.cnx.receivedCommand(cmd, headersAndPayload)
+ }
+}
+
+func (r *connectionReader) readSingleCommand() (cmd *pb.BaseCommand, headersAndPayload []byte, err error) {
+ // First, we need to read the frame size
+ if r.buffer.ReadableBytes() < 4 {
+ if r.buffer.ReadableBytes() == 0 {
+ // If the buffer is empty, just go back to write at the beginning
+ r.buffer.Clear()
+ }
+ if !r.readAtLeast(4) {
+ return nil, nil, errors.New("Short read when reading frame size")
+ }
+ }
+
+ // We have enough to read frame size
+ frameSize := r.buffer.ReadUint32()
+ if frameSize > MaxFrameSize {
+ r.cnx.log.Warnf("Received too big frame size. size=%d", frameSize)
+ r.cnx.internalClose()
+ return nil, nil, errors.New("Frame size too big")
+ }
+
+ // Next, we read the rest of the frame
+ if r.buffer.ReadableBytes() < frameSize {
+ if !r.readAtLeast(frameSize) {
+ return nil, nil, errors.New("Short read when reading frame")
+ }
+ }
+
+ // We have now the complete frame
+ cmdSize := r.buffer.ReadUint32()
+ cmd, err = r.deserializeCmd(r.buffer.Read(cmdSize))
+ if err != nil {
+ return nil, nil, err
+ }
+
+ // Also read the eventual payload
+ headersAndPayloadSize := frameSize - (cmdSize + 4)
+ if cmdSize+4 < frameSize {
+ headersAndPayload = make([]byte, headersAndPayloadSize)
+ copy(headersAndPayload, r.buffer.Read(headersAndPayloadSize))
+ }
+ return cmd, headersAndPayload, nil
+}
+
+func (r *connectionReader) readAtLeast(size uint32) (ok bool) {
+ if r.buffer.WritableBytes() < size {
+ // There's not enough room in the current buffer to read the requested amount of data
+ totalFrameSize := r.buffer.ReadableBytes() + size
+ if r.buffer.ReadableBytes()+size > r.buffer.Capacity() {
+ // Resize to a bigger buffer to avoid continuous resizing
+ r.buffer.Resize(totalFrameSize * 2)
+ } else {
+ // Compact the buffer by moving the partial data to the beginning.
+ // This will have enough room for reading the remainder of the data
+ r.buffer.MoveToFront()
+ }
+ }
+
+ n, err := io.ReadAtLeast(r.cnx.cnx, r.buffer.WritableSlice(), int(size))
+ if err != nil {
+ r.cnx.internalClose()
+ return false
+ }
+
+ r.buffer.WrittenBytes(uint32(n))
+ return true
+}
+
+func (r *connectionReader) deserializeCmd(data []byte) (*pb.BaseCommand, error) {
+ cmd := &pb.BaseCommand{}
+ err := proto.Unmarshal(data, cmd)
+ if err != nil {
+ r.cnx.log.WithError(err).Warn("Failed to parse protobuf command")
+ r.cnx.internalClose()
+ return nil, err
+ } else {
+ return cmd, nil
+ }
+}
diff --git a/pulsar/impl/default_router.go b/pulsar/impl/default_router.go
new file mode 100644
index 0000000..bb5ca73
--- /dev/null
+++ b/pulsar/impl/default_router.go
@@ -0,0 +1,26 @@
+package impl
+
+import (
+ "time"
+)
+
+type defaultRouter struct {
+ partitionIdx uint32
+ maxBatchingDelay time.Duration
+}
+
+func NewDefaultRouter(maxBatchingDelay time.Duration) func(uint32) int {
+ // TODO: XXX
+ //state := &defaultRouter{
+ // partitionIdx: rand.Uint32(),
+ // maxBatchingDelay: maxBatchingDelay,
+ //}
+
+ return func(numPartitions uint32) int {
+ if numPartitions == 1 {
+ return 0
+ }
+
+ return -1
+ }
+}
diff --git a/pulsar/impl/lookup_service.go b/pulsar/impl/lookup_service.go
new file mode 100644
index 0000000..0ebf424
--- /dev/null
+++ b/pulsar/impl/lookup_service.go
@@ -0,0 +1,62 @@
+package impl
+
+import (
+ log "github.com/sirupsen/logrus"
+ "net/url"
+ pb "pulsar-client-go-native/pulsar/pulsar_proto"
+)
+
+type LookupResult struct {
+ LogicalAddr *url.URL
+ PhysicalAddr *url.URL
+}
+
+type LookupService interface {
+ Lookup(topic string) (*LookupResult, error)
+}
+
+type lookupService struct {
+ rpcClient RpcClient
+}
+
+func NewLookupService(rpcClient RpcClient) LookupService {
+ return &lookupService{
+ rpcClient: rpcClient,
+ }
+}
+
+func (ls *lookupService) Lookup(topic string) (*LookupResult, error) {
+ // Follow brokers redirect up to certain number of times
+ for i := 0; i < 20; i++ {
+ id := ls.rpcClient.NewRequestId()
+ res, err := ls.rpcClient.RequestToAnyBroker(id, pb.BaseCommand_LOOKUP, &pb.CommandLookupTopic{
+ RequestId: &id,
+ Topic: &topic,
+ })
+
+ if err != nil {
+ return nil, err
+ }
+
+ log.Infof("Got lookup response: %s", res)
+ lr := res.Response.LookupTopicResponse
+ switch *lr.Response {
+ case pb.CommandLookupTopicResponse_Redirect:
+ log.WithField("topic", topic).Infof("Follow redirect to broker. %v / %v - Use proxy: %v",
+ lr.BrokerServiceUrl, lr.BrokerServiceUrlTls, lr.ProxyThroughServiceUrl)
+ break
+
+ case pb.CommandLookupTopicResponse_Connect:
+ log.WithField("topic", topic).Infof("Successfully looked up topic on broker. %s / %s - Use proxy: %t",
+ lr.GetBrokerServiceUrl(), lr.GetBrokerServiceUrlTls(), lr.GetProxyThroughServiceUrl())
+ return nil, nil
+
+ case pb.CommandLookupTopicResponse_Failed:
+ log.WithField("topic", topic).Warn("Failed to lookup topic",
+ lr.Error.String())
+ return nil, nil
+ }
+ }
+
+ return nil, nil
+}
diff --git a/pulsar/impl/rpc_client.go b/pulsar/impl/rpc_client.go
new file mode 100644
index 0000000..fd76569
--- /dev/null
+++ b/pulsar/impl/rpc_client.go
@@ -0,0 +1,71 @@
+package impl
+
+import (
+ "github.com/golang/protobuf/proto"
+ "net/url"
+ pb "pulsar-client-go-native/pulsar/pulsar_proto"
+ "sync"
+ "sync/atomic"
+)
+
+type RpcResult struct {
+ Response *pb.BaseCommand
+ Cnx Connection
+}
+
+type RpcClient interface {
+ // Create a new unique request id
+ NewRequestId() uint64
+
+ // Send a request and block until the result is available
+ RequestToAnyBroker(requestId uint64, cmdType pb.BaseCommand_Type, message proto.Message) (*RpcResult, error)
+
+ Request(logicalAddr string, physicalAddr string, requestId uint64,
+ cmdType pb.BaseCommand_Type, message proto.Message) (*RpcResult, error)
+}
+
+type rpcClient struct {
+ hostPort string
+ pool ConnectionPool
+ requestIdGenerator uint64
+}
+
+func NewRpcClient(serviceUrl *url.URL, pool ConnectionPool) RpcClient {
+ return &rpcClient{
+ hostPort: serviceUrl.Host,
+ pool: pool,
+ }
+}
+
+func (c *rpcClient) RequestToAnyBroker(requestId uint64, cmdType pb.BaseCommand_Type, message proto.Message) (*RpcResult, error) {
+ return c.Request(c.hostPort, c.hostPort, requestId, cmdType, message)
+}
+
+func (c *rpcClient) Request(logicalAddr string, physicalAddr string, requestId uint64,
+ cmdType pb.BaseCommand_Type, message proto.Message) (*RpcResult, error) {
+ // TODO: Add retry logic in case of connection issues
+ cnx, err := c.pool.GetConnection(logicalAddr, physicalAddr)
+ if err != nil {
+ return nil, err
+ }
+
+ wg := sync.WaitGroup{}
+ wg.Add(1)
+
+ rpcResult := &RpcResult{
+ Cnx: cnx,
+ }
+
+ // TODO: Handle errors with disconnections
+ cnx.SendRequest(requestId, baseCommand(cmdType, message), func(response *pb.BaseCommand) {
+ rpcResult.Response = response
+ wg.Done()
+ })
+
+ wg.Wait()
+ return rpcResult, nil
+}
+
+func (c *rpcClient) NewRequestId() uint64 {
+ return atomic.AddUint64(&c.requestIdGenerator, 1)
+}
diff --git a/pulsar/impl/topic_name.go b/pulsar/impl/topic_name.go
new file mode 100644
index 0000000..2bafeb2
--- /dev/null
+++ b/pulsar/impl/topic_name.go
@@ -0,0 +1,88 @@
+package impl
+
+import (
+ "errors"
+ "fmt"
+ "strconv"
+ "strings"
+)
+
+type TopicName struct {
+ Name string
+ Namespace string
+ Partition int
+}
+
+const (
+ publicTenant = "public"
+ defaultNamespace = "default"
+ partitionedTopicSuffix = "-partition-"
+)
+
+func ParseTopicName(topic string) (*TopicName, error) {
+ // The topic name can be in two different forms, one is fully qualified topic name,
+ // the other one is short topic name
+ if !strings.Contains(topic, "://") {
+ // The short topic name can be:
+ // - <topic>
+ // - <property>/<namespace>/<topic>
+ parts := strings.Split(topic, "/")
+ if len(parts) == 3 {
+ topic = "persistent://" + topic
+ } else if len(parts) == 1 {
+ topic = "persistent://" + publicTenant + "/" + defaultNamespace + "/" + parts[0]
+ } else {
+ return nil, errors.New(
+ "Invalid short topic name '" + topic +
+ "', it should be in the format of <tenant>/<namespace>/<topic> or <topic>")
+ }
+ }
+
+ tn := &TopicName{}
+
+ // The fully qualified topic name can be in two different forms:
+ // new: persistent://tenant/namespace/topic
+ // legacy: persistent://tenant/cluster/namespace/topic
+ parts := strings.SplitN(topic, "://", 2)
+ domain := parts[0]
+ if domain != "persistent" && domain != "non-persistent" {
+ return nil, errors.New("Invalid topic domain: " + domain)
+ }
+
+ rest := parts[1]
+ var err error
+
+ // The rest of the name can be in different forms:
+ // new: tenant/namespace/<localName>
+ // legacy: tenant/cluster/namespace/<localName>
+ // Examples of localName:
+ // 1. some/name/xyz//
+ // 2. /xyz-123/feeder-2
+ parts = strings.SplitN(rest, "/", 4)
+ if len(parts) == 3 {
+ // New topic name without cluster name
+ tn.Namespace = parts[0] + "/" + parts[1]
+ } else if len(parts) == 4 {
+ // Legacy topic name that includes cluster name
+ tn.Namespace = fmt.Sprintf("%s/%s/%s", parts[0], parts[1], parts[2])
+ } else {
+ return nil, errors.New("Invalid topic name: " + topic)
+ }
+
+ tn.Name = topic
+ tn.Partition, err = getPartitionIndex(topic)
+ if err != nil {
+ return nil, err
+ }
+
+ return tn, nil
+}
+
+func getPartitionIndex(topic string) (int, error) {
+ if strings.Contains(topic, partitionedTopicSuffix) {
+ idx := strings.LastIndex(topic, "-") + 1
+ return strconv.Atoi(topic[idx:])
+ } else {
+ return -1, nil
+ }
+}
diff --git a/pulsar/impl/topic_name_test.go b/pulsar/impl/topic_name_test.go
new file mode 100644
index 0000000..b4d6184
--- /dev/null
+++ b/pulsar/impl/topic_name_test.go
@@ -0,0 +1,68 @@
+package impl
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestParseTopicName(t *testing.T) {
+ topic, err := ParseTopicName("persistent://my-tenant/my-ns/my-topic")
+
+ assert.Nil(t, err)
+ assert.Equal(t, "persistent://my-tenant/my-ns/my-topic", topic.Name)
+ assert.Equal(t, "my-tenant/my-ns", topic.Namespace)
+ assert.Equal(t, -1, topic.Partition)
+
+ topic, err = ParseTopicName("my-topic")
+ assert.Nil(t, err)
+ assert.Equal(t, "persistent://public/default/my-topic", topic.Name)
+ assert.Equal(t, "public/default", topic.Namespace)
+ assert.Equal(t, -1, topic.Partition)
+
+ topic, err = ParseTopicName("my-tenant/my-namespace/my-topic")
+ assert.Nil(t, err)
+ assert.Equal(t, "persistent://my-tenant/my-namespace/my-topic", topic.Name)
+ assert.Equal(t, "my-tenant/my-namespace", topic.Namespace)
+ assert.Equal(t, -1, topic.Partition)
+
+ topic, err = ParseTopicName("non-persistent://my-tenant/my-namespace/my-topic")
+ assert.Nil(t, err)
+ assert.Equal(t, "non-persistent://my-tenant/my-namespace/my-topic", topic.Name)
+ assert.Equal(t, "my-tenant/my-namespace", topic.Namespace)
+ assert.Equal(t, -1, topic.Partition)
+
+
+ topic, err = ParseTopicName("my-topic-partition-5")
+ assert.Nil(t, err)
+ assert.Equal(t, "persistent://public/default/my-topic-partition-5", topic.Name)
+ assert.Equal(t, "public/default", topic.Namespace)
+ assert.Equal(t, 5, topic.Partition)
+
+ // V1 topic name
+ topic, err = ParseTopicName("persistent://my-tenant/my-cluster/my-ns/my-topic")
+ assert.Nil(t, err)
+ assert.Equal(t, "persistent://my-tenant/my-cluster/my-ns/my-topic", topic.Name)
+ assert.Equal(t, "my-tenant/my-cluster/my-ns", topic.Namespace)
+ assert.Equal(t, -1, topic.Partition)
+}
+
+func TestParseTopicNameErrors(t *testing.T) {
+ _, err := ParseTopicName("invalid://my-tenant/my-ns/my-topic")
+ assert.NotNil(t, err)
+
+ _, err = ParseTopicName("invalid://my-tenant/my-ns/my-topic-partition-xyz")
+ assert.NotNil(t, err)
+
+ _, err = ParseTopicName("my-tenant/my-ns/my-topic-partition-xyz/invalid")
+ assert.NotNil(t, err)
+
+ _, err = ParseTopicName("persistent://my-tenant")
+ assert.NotNil(t, err)
+
+ _, err = ParseTopicName("persistent://my-tenant/my-namespace")
+ assert.NotNil(t, err)
+
+ _, err = ParseTopicName("persistent://my-tenant/my-cluster/my-ns/my-topic-partition-xyz/invalid")
+ assert.NotNil(t, err)
+}
diff --git a/pulsar/impl_client.go b/pulsar/impl_client.go
new file mode 100644
index 0000000..e143601
--- /dev/null
+++ b/pulsar/impl_client.go
@@ -0,0 +1,98 @@
+package pulsar
+
+import (
+ "fmt"
+ log "github.com/sirupsen/logrus"
+ "net/url"
+ "pulsar-client-go-native/pulsar/impl"
+ pb "pulsar-client-go-native/pulsar/pulsar_proto"
+)
+
+type client struct {
+ options ClientOptions
+
+ cnxPool impl.ConnectionPool
+ rpcClient impl.RpcClient
+ lookupService impl.LookupService
+
+ handlers map[impl.Closable]bool
+}
+
+func newClient(options ClientOptions) (Client, error) {
+ if options.URL == "" {
+ return nil, newError(InvalidConfiguration, "URL is required for client")
+ }
+
+ url, err := url.Parse(options.URL)
+ if err != nil {
+ log.WithError(err).Error("Failed to parse service URL")
+ return nil, newError(InvalidConfiguration, "Invalid service URL")
+ }
+
+ if url.Scheme != "pulsar" {
+ return nil, newError(InvalidConfiguration, fmt.Sprintf("Invalid URL scheme '%s'", url.Scheme))
+ }
+
+ c := &client{
+ cnxPool: impl.NewConnectionPool(),
+ }
+ c.rpcClient = impl.NewRpcClient(url, c.cnxPool)
+ c.lookupService = impl.NewLookupService(c.rpcClient)
+ return c, nil
+}
+
+func (client *client) CreateProducer(options ProducerOptions) (Producer, error) {
+ producer, err := newProducer(client, options)
+ if err == nil {
+ client.handlers[producer] = true
+ }
+ return producer, err
+}
+
+func (client *client) Subscribe(options ConsumerOptions) (Consumer, error) {
+ // TODO: Implement consumer
+ return nil, nil
+}
+
+func (client *client) CreateReader(options ReaderOptions) (Reader, error) {
+ // TODO: Implement reader
+ return nil, nil
+}
+
+func (client *client) TopicPartitions(topic string) ([]string, error) {
+ topicName, err := impl.ParseTopicName(topic)
+ if err != nil {
+ return nil, err
+ }
+
+ id := client.rpcClient.NewRequestId()
+ res, err := client.rpcClient.RequestToAnyBroker(id, pb.BaseCommand_PARTITIONED_METADATA,
+ &pb.CommandPartitionedTopicMetadata{
+ RequestId: &id,
+ Topic: &topicName.Name,
+ })
+ if err != nil {
+ return nil, err
+ }
+
+ r := res.Response.PartitionMetadataResponse
+ if r.Error != nil {
+ return nil, newError(LookupError, r.GetError().String())
+ }
+
+ partitions := make([]string, r.GetPartitions())
+ for i := 0; i < int(r.GetPartitions()); i++ {
+ partitions[i] = fmt.Sprintf("%s-partition-%d", topic, i)
+ }
+ return partitions, nil
+}
+
+func (client *client) Close() error {
+ for handler := range client.handlers {
+ if err := handler.Close(); err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
diff --git a/pulsar/impl_partition_producer.go b/pulsar/impl_partition_producer.go
new file mode 100644
index 0000000..bb96a27
--- /dev/null
+++ b/pulsar/impl_partition_producer.go
@@ -0,0 +1,88 @@
+package pulsar
+
+import (
+ "context"
+ log "github.com/sirupsen/logrus"
+ "pulsar-client-go-native/pulsar/impl"
+ pb "pulsar-client-go-native/pulsar/pulsar_proto"
+ "sync"
+)
+
+type partitionProducer struct {
+ client *client
+ topic string
+ log *log.Entry
+ mutex sync.Mutex
+ cond *sync.Cond
+}
+
+func newPartitionProducer(client *client, topic string, options *ProducerOptions) (*partitionProducer, error) {
+
+ p := &partitionProducer{
+ log: log.WithField("topic", topic),
+ }
+
+ err := p.grabCnx()
+ if err != nil {
+ log.WithError(err).Errorf("Failed to create producer")
+ return nil, err
+ } else {
+ log.Info("Created producer")
+ return p, nil
+ }
+}
+
+func (p *partitionProducer) grabCnx() error {
+ lr, err := p.client.lookupService.Lookup(p.topic)
+ if err != nil {
+ p.log.WithError(err).Warn("Failed to lookup topic")
+ return err
+ }
+
+ id := p.client.rpcClient.NewRequestId()
+ p.client.rpcClient.Request(lr.LogicalAddr.Host, lr.PhysicalAddr.Host, id, pb.BaseCommand_PRODUCER, *pb.CommandProducer{
+
+ })
+
+ var cnx impl.Connection
+ cnx, err = p.client.cnxPool.GetConnection(lr.LogicalAddr.Host, lr.PhysicalAddr.Host)
+ if err != nil {
+ p.log.WithError(err).Warn("Failed to get connection")
+ return err
+ }
+
+ cnx.
+
+ return nil
+}
+
+func (p *partitionProducer) run() {
+
+}
+
+func (p *partitionProducer) Topic() string {
+ return ""
+}
+
+func (p *partitionProducer) Name() string {
+ return ""
+}
+
+func (p *partitionProducer) Send(context.Context, ProducerMessage) error {
+ return nil
+}
+
+func (p *partitionProducer) SendAsync(context.Context, ProducerMessage, func(ProducerMessage, error)) {
+}
+
+func (p *partitionProducer) LastSequenceID() int64 {
+ return -1
+}
+
+func (p *partitionProducer) Flush() error {
+ return nil
+}
+
+func (p *partitionProducer) Close() error {
+ return nil
+}
diff --git a/pulsar/impl_producer.go b/pulsar/impl_producer.go
new file mode 100644
index 0000000..f2332ea
--- /dev/null
+++ b/pulsar/impl_producer.go
@@ -0,0 +1,123 @@
+package pulsar
+
+import (
+ "context"
+ "pulsar-client-go-native/pulsar/impl"
+ "sync"
+)
+
+type producer struct {
+ topic string
+ producers []Producer
+ messageRouter func(ProducerMessage, TopicMetadata) int
+}
+
+func newProducer(client *client, options ProducerOptions) (*producer, error) {
+ if options.Topic == "" {
+ return nil, newError(InvalidTopicName, "Topic name is required for producer")
+ }
+
+ p := &producer{
+ topic: options.Topic,
+ }
+
+ if options.MessageRouter == nil {
+ internalRouter := impl.NewDefaultRouter(options.BatchingMaxPublishDelay)
+ p.messageRouter = func(message ProducerMessage, metadata TopicMetadata) int {
+ return internalRouter(metadata.NumPartitions())
+ }
+ }
+
+ partitions, err := client.TopicPartitions(options.Topic)
+ if err != nil {
+ return nil, err
+ }
+
+ numPartitions := len(partitions)
+ p.producers = make([]Producer, numPartitions)
+
+ type ProducerError struct {
+ partition int
+ Producer
+ error
+ }
+
+ c := make(chan ProducerError, numPartitions)
+
+ for i := 0; i < numPartitions; i++ {
+ partition := i
+ go func() {
+ prod, err := newPartitionProducer(client, &options)
+ c <- ProducerError{partition, prod, err}
+ }()
+ }
+
+ for i := 0; i < numPartitions; i++ {
+ pe := <-c
+ err = pe.error
+ p.producers[pe.partition] = pe.Producer
+ }
+
+ if err != nil {
+ // Since there were some failures, cleanup all the partitions that succeeded in creating the producers
+ for _, producer := range p.producers {
+ if producer != nil {
+ _ := producer.Close()
+ }
+ }
+ return nil, err
+ } else {
+ return p, nil
+ }
+}
+
+func (p *producer) Topic() string {
+ return p.topic
+}
+
+func (p *producer) Name() string {
+ return p.producers[0].Name()
+}
+
+func (p *producer) NumPartitions() uint32 {
+ return uint32(len(p.producers))
+}
+
+func (p *producer) Send(ctx context.Context, msg ProducerMessage) error {
+ wg := sync.WaitGroup{}
+ wg.Add(1)
+
+ var err error
+
+ p.SendAsync(ctx, msg, func(message ProducerMessage, e error) {
+ err = e
+ wg.Done()
+ })
+
+ wg.Wait()
+ return err
+}
+
+func (p *producer) SendAsync(ctx context.Context, msg ProducerMessage, callback func(ProducerMessage, error)) {
+ partition := p.messageRouter(msg, p)
+ p.producers[partition].SendAsync(ctx, msg, callback)
+}
+
+func (p *producer) LastSequenceID() int64 {
+ var maxSeq int64 = -1
+ for _, pp := range p.producers {
+ s := pp.LastSequenceID()
+ if s > maxSeq {
+ maxSeq = s
+ }
+ }
+ return maxSeq
+}
+
+func (p *producer) Flush() error {
+ return nil
+}
+
+func (p *producer) Close() error {
+ return nil
+}
diff --git a/pulsar/message.go b/pulsar/message.go
new file mode 100644
index 0000000..b829b96
--- /dev/null
+++ b/pulsar/message.go
@@ -0,0 +1,90 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package pulsar
+
+import "time"
+
+type ProducerMessage struct {
+ // Payload for the message
+ Payload []byte
+
+ // Sets the key of the message for routing policy
+ Key string
+
+ // Attach application defined properties on the message
+ Properties map[string]string
+
+ // Set the event time for a given message
+ EventTime time.Time
+
+ // Override the replication clusters for this message.
+ ReplicationClusters []string
+
+ // Set the sequence id to assign to the current message
+ SequenceID int64
+}
+
+type Message interface {
+ // Get the topic from which this message originated from
+ Topic() string
+
+ // Return the properties attached to the message.
+ // Properties are application defined key/value pairs that will be attached to the message
+ Properties() map[string]string
+
+ // Get the payload of the message
+ Payload() []byte
+
+ // Get the unique message ID associated with this message.
+ // The message id can be used to univocally refer to a message without having the keep the entire payload in memory.
+ ID() MessageID
+
+ // Get the publish time of this message. The publish time is the timestamp that a client publish the message.
+ PublishTime() time.Time
+
+ // Get the event time associated with this message. It is typically set by the applications via
+ // `ProducerMessage.EventTime`.
+ // If there isn't any event time associated with this event, it will be nil.
+ EventTime() *time.Time
+
+ // Get the key of the message, if any
+ Key() string
+}
+
+// Identifier for a particular message
+type MessageID interface {
+ // Serialize the message id into a sequence of bytes that can be stored somewhere else
+ Serialize() []byte
+}
+
+// Reconstruct a MessageID object from its serialized representation
+func DeserializeMessageID(data []byte) MessageID {
+ // TODO
+ //return deserializeMessageId(data)
+ return nil
+}
+
+var (
+ // MessageID that points to the earliest message avaialable in a topic
+ //EarliestMessage MessageID = earliestMessageID()
+
+ // MessageID that points to the latest message
+ //LatestMessage MessageID = latestMessageID()
+)
diff --git a/pulsar/producer.go b/pulsar/producer.go
new file mode 100644
index 0000000..90769cb
--- /dev/null
+++ b/pulsar/producer.go
@@ -0,0 +1,187 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package pulsar
+
+import (
+ "context"
+ "time"
+)
+
+type MessageRoutingMode int
+
+const (
+ // Publish messages across all partitions in round-robin.
+ RoundRobinDistribution MessageRoutingMode = iota
+
+ // The producer will chose one single partition and publish all the messages into that partition
+ UseSinglePartition
+
+ // Use custom message router implementation that will be called to determine the partition for a particular message.
+ CustomPartition
+)
+
+type HashingScheme int
+
+const (
+ JavaStringHash HashingScheme = iota // Java String.hashCode() equivalent
+ Murmur3_32Hash // Use Murmur3 hashing function
+ BoostHash // C++ based boost::hash
+)
+
+type CompressionType int
+
+const (
+ NoCompression CompressionType = iota
+ LZ4
+ ZLib
+ ZSTD
+)
+
+type TopicMetadata interface {
+ // Get the number of partitions for the specific topic
+ NumPartitions() uint32
+}
+
+type ProducerOptions struct {
+ // Specify the topic this producer will be publishing on.
+ // This argument is required when constructing the producer.
+ Topic string
+
+ // Specify a name for the producer
+ // If not assigned, the system will generate a globally unique name which can be access with
+ // Producer.ProducerName().
+ // When specifying a name, it is up to the user to ensure that, for a given topic, the producer name is unique
+ // across all Pulsar's clusters. Brokers will enforce that only a single producer a given name can be publishing on
+ // a topic.
+ Name string
+
+ // Attach a set of application defined properties to the producer
+ // This properties will be visible in the topic stats
+ Properties map[string]string
+
+ // Set the send timeout (default: 30 seconds)
+ // If a message is not acknowledged by the server before the sendTimeout expires, an error will be reported.
+ // Setting the timeout to -1, will set the timeout to infinity, which can be useful when using Pulsar's message
+ // deduplication feature.
+ SendTimeout time.Duration
+
+ // Set the max size of the queue holding the messages pending to receive an acknowledgment from the broker.
+ // When the queue is full, by default, all calls to Producer.send() and Producer.sendAsync() will fail
+ // unless `BlockIfQueueFull` is set to true. Use BlockIfQueueFull(boolean) to change the blocking behavior.
+ MaxPendingMessages int
+
+ // Set the number of max pending messages across all the partitions
+ // This setting will be used to lower the max pending messages for each partition
+ // `MaxPendingMessages(int)`, if the total exceeds the configured value.
+ MaxPendingMessagesAcrossPartitions int
+
+ // Set whether the `Producer.Send()` and `Producer.sendAsync()` operations should block when the outgoing
+ // message queue is full. Default is `false`. If set to `false`, send operations will immediately fail with
+ // `ProducerQueueIsFullError` when there is no space left in pending queue.
+ BlockIfQueueFull bool
+
+ // Set the message routing mode for the partitioned producer.
+ // Default routing mode is round-robin routing.
+ //
+ // This logic is applied when the application is not setting a key ProducerMessage#setKey(String) on a
+ // particular message.
+ MessageRoutingMode
+
+ // Change the `HashingScheme` used to chose the partition on where to publish a particular message.
+ // Standard hashing functions available are:
+ //
+ // - `JavaStringHash` : Java String.hashCode() equivalent
+ // - `Murmur3_32Hash` : Use Murmur3 hashing function.
+ // https://en.wikipedia.org/wiki/MurmurHash">https://en.wikipedia.org/wiki/MurmurHash
+ // - `BoostHash` : C++ based boost::hash
+ //
+ // Default is `JavaStringHash`.
+ HashingScheme
+
+ // Set the compression type for the producer.
+ // By default, message payloads are not compressed. Supported compression types are:
+ // - LZ4
+ // - ZLIB
+ // - ZSTD
+ //
+ // Note: ZSTD is supported since Pulsar 2.3. Consumers will need to be at least at that
+ // release in order to be able to receive messages compressed with ZSTD.
+ CompressionType
+
+ // Set a custom message routing policy by passing an implementation of MessageRouter
+ // The router is a function that given a particular message and the topic metadata, returns the
+ // partition index where the message should be routed to
+ MessageRouter func(Message, TopicMetadata) int
+
+ // Control whether automatic batching of messages is enabled for the producer. Default: false [No batching]
+ //
+ // When batching is enabled, multiple calls to Producer.sendAsync can result in a single batch to be sent to the
+ // broker, leading to better throughput, especially when publishing small messages. If compression is enabled,
+ // messages will be compressed at the batch level, leading to a much better compression ratio for similar headers or
+ // contents.
+ //
+ // When enabled default batch delay is set to 1 ms and default batch size is 1000 messages
+ Batching bool
+
+ // Set the time period within which the messages sent will be batched (default: 10ms) if batch messages are
+ // enabled. If set to a non zero value, messages will be queued until this time interval or until
+ BatchingMaxPublishDelay time.Duration
+
+ // Set the maximum number of messages permitted in a batch. (default: 1000) If set to a value greater than 1,
+ // messages will be queued until this threshold is reached or batch interval has elapsed
+ BatchingMaxMessages uint
+}
+
+// The producer is used to publish messages on a topic
+type Producer interface {
+ // return the topic to which producer is publishing to
+ Topic() string
+
+ // return the producer name which could have been assigned by the system or specified by the client
+ Name() string
+
+ // Send a message
+ // This call will be blocking until is successfully acknowledged by the Pulsar broker.
+ // Example:
+ // producer.Send(ctx, pulsar.ProducerMessage{ Payload: myPayload })
+ Send(context.Context, ProducerMessage) error
+
+ // Send a message in asynchronous mode
+ // The callback will report back the message being published and
+ // the eventual error in publishing
+ SendAsync(context.Context, ProducerMessage, func(ProducerMessage, error))
+
+ // Get the last sequence id that was published by this producer.
+ // This represent either the automatically assigned or custom sequence id (set on the ProducerMessage) that
+ // was published and acknowledged by the broker.
+ // After recreating a producer with the same producer name, this will return the last message that was
+ // published in the previous producer session, or -1 if there no message was ever published.
+ // return the last sequence id published by this producer.
+ LastSequenceID() int64
+
+ // Flush all the messages buffered in the client and wait until all messages have been successfully
+ // persisted.
+ Flush() error
+
+ // Close the producer and releases resources allocated
+ // No more writes will be accepted from this producer. Waits until all pending write request are persisted. In case
+ // of errors, pending writes will not be retried.
+ Close() error
+}
diff --git a/pulsar/pulsar_proto/PulsarApi.pb.go b/pulsar/pulsar_proto/PulsarApi.pb.go
new file mode 100644
index 0000000..cb82acd
--- /dev/null
+++ b/pulsar/pulsar_proto/PulsarApi.pb.go
@@ -0,0 +1,4043 @@
+// Code generated by protoc-gen-go. DO NOT EDIT.
+// source: PulsarApi.proto
+
+package pulsar_proto
+
+import proto "github.com/golang/protobuf/proto"
+import fmt "fmt"
+import math "math"
+
+// Reference imports to suppress errors if they are not otherwise used.
+var _ = proto.Marshal
+var _ = fmt.Errorf
+var _ = math.Inf
+
+// This is a compile-time assertion to ensure that this generated file
+// is compatible with the proto package it is being compiled against.
+// A compilation error at this line likely means your copy of the
+// proto package needs to be updated.
+const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
+
+type CompressionType int32
+
+const (
+ CompressionType_NONE CompressionType = 0
+ CompressionType_LZ4 CompressionType = 1
+ CompressionType_ZLIB CompressionType = 2
+ CompressionType_ZSTD CompressionType = 3
+)
+
+var CompressionType_name = map[int32]string{
+ 0: "NONE",
+ 1: "LZ4",
+ 2: "ZLIB",
+ 3: "ZSTD",
+}
+var CompressionType_value = map[string]int32{
+ "NONE": 0,
+ "LZ4": 1,
+ "ZLIB": 2,
+ "ZSTD": 3,
+}
+
+func (x CompressionType) Enum() *CompressionType {
+ p := new(CompressionType)
+ *p = x
+ return p
+}
+func (x CompressionType) String() string {
+ return proto.EnumName(CompressionType_name, int32(x))
+}
+func (x *CompressionType) UnmarshalJSON(data []byte) error {
+ value, err := proto.UnmarshalJSONEnum(CompressionType_value, data, "CompressionType")
+ if err != nil {
+ return err
+ }
+ *x = CompressionType(value)
+ return nil
+}
+func (CompressionType) EnumDescriptor() ([]byte, []int) {
+ return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{0}
+}
+
+type ServerError int32
+
+const (
+ ServerError_UnknownError ServerError = 0
+ ServerError_MetadataError ServerError = 1
+ ServerError_PersistenceError ServerError = 2
+ ServerError_AuthenticationError ServerError = 3
+ ServerError_AuthorizationError ServerError = 4
+ ServerError_ConsumerBusy ServerError = 5
+ // other consumers are connected
+ ServerError_ServiceNotReady ServerError = 6
+ ServerError_ProducerBlockedQuotaExceededError ServerError = 7
+ ServerError_ProducerBlockedQuotaExceededException ServerError = 8
+ ServerError_ChecksumError ServerError = 9
+ ServerError_UnsupportedVersionError ServerError = 10
+ ServerError_TopicNotFound ServerError = 11
+ ServerError_SubscriptionNotFound ServerError = 12
+ ServerError_ConsumerNotFound ServerError = 13
+ ServerError_TooManyRequests ServerError = 14
+ ServerError_TopicTerminatedError ServerError = 15
+ ServerError_ProducerBusy ServerError = 16
+ ServerError_InvalidTopicName ServerError = 17
+ ServerError_IncompatibleSchema ServerError = 18
+)
+
+var ServerError_name = map[int32]string{
+ 0: "UnknownError",
+ 1: "MetadataError",
+ 2: "PersistenceError",
+ 3: "AuthenticationError",
+ 4: "AuthorizationError",
+ 5: "ConsumerBusy",
+ 6: "ServiceNotReady",
+ 7: "ProducerBlockedQuotaExceededError",
+ 8: "ProducerBlockedQuotaExceededException",
+ 9: "ChecksumError",
+ 10: "UnsupportedVersionError",
+ 11: "TopicNotFound",
+ 12: "SubscriptionNotFound",
+ 13: "ConsumerNotFound",
+ 14: "TooManyRequests",
+ 15: "TopicTerminatedError",
+ 16: "ProducerBusy",
+ 17: "InvalidTopicName",
+ 18: "IncompatibleSchema",
+}
+var ServerError_value = map[string]int32{
+ "UnknownError": 0,
+ "MetadataError": 1,
+ "PersistenceError": 2,
+ "AuthenticationError": 3,
+ "AuthorizationError": 4,
+ "ConsumerBusy": 5,
+ "ServiceNotReady": 6,
+ "ProducerBlockedQuotaExceededError": 7,
+ "ProducerBlockedQuotaExceededException": 8,
+ "ChecksumError": 9,
+ "UnsupportedVersionError": 10,
+ "TopicNotFound": 11,
+ "SubscriptionNotFound": 12,
+ "ConsumerNotFound": 13,
+ "TooManyRequests": 14,
+ "TopicTerminatedError": 15,
+ "ProducerBusy": 16,
+ "InvalidTopicName": 17,
+ "IncompatibleSchema": 18,
+}
+
+func (x ServerError) Enum() *ServerError {
+ p := new(ServerError)
+ *p = x
+ return p
+}
+func (x ServerError) String() string {
+ return proto.EnumName(ServerError_name, int32(x))
+}
+func (x *ServerError) UnmarshalJSON(data []byte) error {
+ value, err := proto.UnmarshalJSONEnum(ServerError_value, data, "ServerError")
+ if err != nil {
+ return err
+ }
+ *x = ServerError(value)
+ return nil
+}
+func (ServerError) EnumDescriptor() ([]byte, []int) {
+ return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{1}
+}
+
+type AuthMethod int32
+
+const (
+ AuthMethod_AuthMethodNone AuthMethod = 0
+ AuthMethod_AuthMethodYcaV1 AuthMethod = 1
+ AuthMethod_AuthMethodAthens AuthMethod = 2
+)
+
+var AuthMethod_name = map[int32]string{
+ 0: "AuthMethodNone",
+ 1: "AuthMethodYcaV1",
+ 2: "AuthMethodAthens",
+}
+var AuthMethod_value = map[string]int32{
+ "AuthMethodNone": 0,
+ "AuthMethodYcaV1": 1,
+ "AuthMethodAthens": 2,
+}
+
+func (x AuthMethod) Enum() *AuthMethod {
+ p := new(AuthMethod)
+ *p = x
+ return p
+}
+func (x AuthMethod) String() string {
+ return proto.EnumName(AuthMethod_name, int32(x))
+}
+func (x *AuthMethod) UnmarshalJSON(data []byte) error {
+ value, err := proto.UnmarshalJSONEnum(AuthMethod_value, data, "AuthMethod")
+ if err != nil {
+ return err
+ }
+ *x = AuthMethod(value)
+ return nil
+}
+func (AuthMethod) EnumDescriptor() ([]byte, []int) {
+ return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{2}
+}
+
+// Each protocol version identify new features that are
+// incrementally added to the protocol
+type ProtocolVersion int32
+
+const (
+ ProtocolVersion_v0 ProtocolVersion = 0
+ ProtocolVersion_v1 ProtocolVersion = 1
+ ProtocolVersion_v2 ProtocolVersion = 2
+ ProtocolVersion_v3 ProtocolVersion = 3
+ ProtocolVersion_v4 ProtocolVersion = 4
+ ProtocolVersion_v5 ProtocolVersion = 5
+ ProtocolVersion_v6 ProtocolVersion = 6
+ ProtocolVersion_v7 ProtocolVersion = 7
+ ProtocolVersion_v8 ProtocolVersion = 8
+ ProtocolVersion_v9 ProtocolVersion = 9
+ ProtocolVersion_v10 ProtocolVersion = 10
+ ProtocolVersion_v11 ProtocolVersion = 11
+ ProtocolVersion_v12 ProtocolVersion = 12
+ // Added CommandActiveConsumerChange
+ // Added CommandGetTopicsOfNamespace
+ ProtocolVersion_v13 ProtocolVersion = 13
+)
+
+var ProtocolVersion_name = map[int32]string{
+ 0: "v0",
+ 1: "v1",
+ 2: "v2",
+ 3: "v3",
+ 4: "v4",
+ 5: "v5",
+ 6: "v6",
+ 7: "v7",
+ 8: "v8",
+ 9: "v9",
+ 10: "v10",
+ 11: "v11",
+ 12: "v12",
+ 13: "v13",
+}
+var ProtocolVersion_value = map[string]int32{
+ "v0": 0,
+ "v1": 1,
+ "v2": 2,
+ "v3": 3,
+ "v4": 4,
+ "v5": 5,
+ "v6": 6,
+ "v7": 7,
+ "v8": 8,
+ "v9": 9,
+ "v10": 10,
+ "v11": 11,
+ "v12": 12,
+ "v13": 13,
+}
+
+func (x ProtocolVersion) Enum() *ProtocolVersion {
+ p := new(ProtocolVersion)
+ *p = x
+ return p
+}
+func (x ProtocolVersion) String() string {
+ return proto.EnumName(ProtocolVersion_name, int32(x))
+}
+func (x *ProtocolVersion) UnmarshalJSON(data []byte) error {
+ value, err := proto.UnmarshalJSONEnum(ProtocolVersion_value, data, "ProtocolVersion")
+ if err != nil {
+ return err
+ }
+ *x = ProtocolVersion(value)
+ return nil
+}
+func (ProtocolVersion) EnumDescriptor() ([]byte, []int) {
+ return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{3}
+}
+
+type Schema_Type int32
+
+const (
+ Schema_None Schema_Type = 0
+ Schema_String Schema_Type = 1
+ Schema_Json Schema_Type = 2
+ Schema_Protobuf Schema_Type = 3
+ Schema_Avro Schema_Type = 4
+)
+
+var Schema_Type_name = map[int32]string{
+ 0: "None",
+ 1: "String",
+ 2: "Json",
+ 3: "Protobuf",
+ 4: "Avro",
+}
+var Schema_Type_value = map[string]int32{
+ "None": 0,
+ "String": 1,
+ "Json": 2,
+ "Protobuf": 3,
+ "Avro": 4,
+}
+
+func (x Schema_Type) Enum() *Schema_Type {
+ p := new(Schema_Type)
+ *p = x
+ return p
+}
+func (x Schema_Type) String() string {
+ return proto.EnumName(Schema_Type_name, int32(x))
+}
+func (x *Schema_Type) UnmarshalJSON(data []byte) error {
+ value, err := proto.UnmarshalJSONEnum(Schema_Type_value, data, "Schema_Type")
+ if err != nil {
+ return err
+ }
+ *x = Schema_Type(value)
+ return nil
+}
+func (Schema_Type) EnumDescriptor() ([]byte, []int) {
+ return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{0, 0}
+}
+
+type CommandSubscribe_SubType int32
+
+const (
+ CommandSubscribe_Exclusive CommandSubscribe_SubType = 0
+ CommandSubscribe_Shared CommandSubscribe_SubType = 1
+ CommandSubscribe_Failover CommandSubscribe_SubType = 2
+)
+
+var CommandSubscribe_SubType_name = map[int32]string{
+ 0: "Exclusive",
+ 1: "Shared",
+ 2: "Failover",
+}
+var CommandSubscribe_SubType_value = map[string]int32{
+ "Exclusive": 0,
+ "Shared": 1,
+ "Failover": 2,
+}
+
+func (x CommandSubscribe_SubType) Enum() *CommandSubscribe_SubType {
+ p := new(CommandSubscribe_SubType)
+ *p = x
+ return p
+}
+func (x CommandSubscribe_SubType) String() string {
+ return proto.EnumName(CommandSubscribe_SubType_name, int32(x))
+}
+func (x *CommandSubscribe_SubType) UnmarshalJSON(data []byte) error {
+ value, err := proto.UnmarshalJSONEnum(CommandSubscribe_SubType_value, data, "CommandSubscribe_SubType")
+ if err != nil {
+ return err
+ }
+ *x = CommandSubscribe_SubType(value)
+ return nil
+}
+func (CommandSubscribe_SubType) EnumDescriptor() ([]byte, []int) {
+ return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{9, 0}
+}
+
+type CommandSubscribe_InitialPosition int32
+
+const (
+ CommandSubscribe_Latest CommandSubscribe_InitialPosition = 0
+ CommandSubscribe_Earliest CommandSubscribe_InitialPosition = 1
+)
+
+var CommandSubscribe_InitialPosition_name = map[int32]string{
+ 0: "Latest",
+ 1: "Earliest",
+}
+var CommandSubscribe_InitialPosition_value = map[string]int32{
+ "Latest": 0,
+ "Earliest": 1,
+}
+
+func (x CommandSubscribe_InitialPosition) Enum() *CommandSubscribe_InitialPosition {
+ p := new(CommandSubscribe_InitialPosition)
+ *p = x
+ return p
+}
+func (x CommandSubscribe_InitialPosition) String() string {
+ return proto.EnumName(CommandSubscribe_InitialPosition_name, int32(x))
+}
+func (x *CommandSubscribe_InitialPosition) UnmarshalJSON(data []byte) error {
+ value, err := proto.UnmarshalJSONEnum(CommandSubscribe_InitialPosition_value, data, "CommandSubscribe_InitialPosition")
+ if err != nil {
+ return err
+ }
+ *x = CommandSubscribe_InitialPosition(value)
+ return nil
+}
+func (CommandSubscribe_InitialPosition) EnumDescriptor() ([]byte, []int) {
+ return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{9, 1}
+}
+
+type CommandPartitionedTopicMetadataResponse_LookupType int32
+
+const (
+ CommandPartitionedTopicMetadataResponse_Success CommandPartitionedTopicMetadataResponse_LookupType = 0
+ CommandPartitionedTopicMetadataResponse_Failed CommandPartitionedTopicMetadataResponse_LookupType = 1
+)
+
+var CommandPartitionedTopicMetadataResponse_LookupType_name = map[int32]string{
+ 0: "Success",
+ 1: "Failed",
+}
+var CommandPartitionedTopicMetadataResponse_LookupType_value = map[string]int32{
+ "Success": 0,
+ "Failed": 1,
+}
+
+func (x CommandPartitionedTopicMetadataResponse_LookupType) Enum() *CommandPartitionedTopicMetadataResponse_LookupType {
+ p := new(CommandPartitionedTopicMetadataResponse_LookupType)
+ *p = x
+ return p
+}
+func (x CommandPartitionedTopicMetadataResponse_LookupType) String() string {
+ return proto.EnumName(CommandPartitionedTopicMetadataResponse_LookupType_name, int32(x))
+}
+func (x *CommandPartitionedTopicMetadataResponse_LookupType) UnmarshalJSON(data []byte) error {
+ value, err := proto.UnmarshalJSONEnum(CommandPartitionedTopicMetadataResponse_LookupType_value, data, "CommandPartitionedTopicMetadataResponse_LookupType")
+ if err != nil {
+ return err
+ }
+ *x = CommandPartitionedTopicMetadataResponse_LookupType(value)
+ return nil
+}
+func (CommandPartitionedTopicMetadataResponse_LookupType) EnumDescriptor() ([]byte, []int) {
+ return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{11, 0}
+}
+
+type CommandLookupTopicResponse_LookupType int32
+
+const (
+ CommandLookupTopicResponse_Redirect CommandLookupTopicResponse_LookupType = 0
+ CommandLookupTopicResponse_Connect CommandLookupTopicResponse_LookupType = 1
+ CommandLookupTopicResponse_Failed CommandLookupTopicResponse_LookupType = 2
+)
+
+var CommandLookupTopicResponse_LookupType_name = map[int32]string{
+ 0: "Redirect",
+ 1: "Connect",
+ 2: "Failed",
+}
+var CommandLookupTopicResponse_LookupType_value = map[string]int32{
+ "Redirect": 0,
+ "Connect": 1,
+ "Failed": 2,
+}
+
+func (x CommandLookupTopicResponse_LookupType) Enum() *CommandLookupTopicResponse_LookupType {
+ p := new(CommandLookupTopicResponse_LookupType)
+ *p = x
+ return p
+}
+func (x CommandLookupTopicResponse_LookupType) String() string {
+ return proto.EnumName(CommandLookupTopicResponse_LookupType_name, int32(x))
+}
+func (x *CommandLookupTopicResponse_LookupType) UnmarshalJSON(data []byte) error {
+ value, err := proto.UnmarshalJSONEnum(CommandLookupTopicResponse_LookupType_value, data, "CommandLookupTopicResponse_LookupType")
+ if err != nil {
+ return err
+ }
+ *x = CommandLookupTopicResponse_LookupType(value)
+ return nil
+}
+func (CommandLookupTopicResponse_LookupType) EnumDescriptor() ([]byte, []int) {
+ return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{13, 0}
+}
+
+type CommandAck_AckType int32
+
+const (
+ CommandAck_Individual CommandAck_AckType = 0
+ CommandAck_Cumulative CommandAck_AckType = 1
+)
+
+var CommandAck_AckType_name = map[int32]string{
+ 0: "Individual",
+ 1: "Cumulative",
+}
+var CommandAck_AckType_value = map[string]int32{
+ "Individual": 0,
+ "Cumulative": 1,
+}
+
+func (x CommandAck_AckType) Enum() *CommandAck_AckType {
+ p := new(CommandAck_AckType)
+ *p = x
+ return p
+}
+func (x CommandAck_AckType) String() string {
+ return proto.EnumName(CommandAck_AckType_name, int32(x))
+}
+func (x *CommandAck_AckType) UnmarshalJSON(data []byte) error {
+ value, err := proto.UnmarshalJSONEnum(CommandAck_AckType_value, data, "CommandAck_AckType")
+ if err != nil {
+ return err
+ }
+ *x = CommandAck_AckType(value)
+ return nil
+}
+func (CommandAck_AckType) EnumDescriptor() ([]byte, []int) {
+ return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{19, 0}
+}
+
+// Acks can contain a flag to indicate the consumer
+// received an invalid message that got discarded
+// before being passed on to the application.
+type CommandAck_ValidationError int32
+
+const (
+ CommandAck_UncompressedSizeCorruption CommandAck_ValidationError = 0
+ CommandAck_DecompressionError CommandAck_ValidationError = 1
+ CommandAck_ChecksumMismatch CommandAck_ValidationError = 2
+ CommandAck_BatchDeSerializeError CommandAck_ValidationError = 3
+ CommandAck_DecryptionError CommandAck_ValidationError = 4
+)
+
+var CommandAck_ValidationError_name = map[int32]string{
+ 0: "UncompressedSizeCorruption",
+ 1: "DecompressionError",
+ 2: "ChecksumMismatch",
+ 3: "BatchDeSerializeError",
+ 4: "DecryptionError",
+}
+var CommandAck_ValidationError_value = map[string]int32{
+ "UncompressedSizeCorruption": 0,
+ "DecompressionError": 1,
+ "ChecksumMismatch": 2,
+ "BatchDeSerializeError": 3,
+ "DecryptionError": 4,
+}
+
+func (x CommandAck_ValidationError) Enum() *CommandAck_ValidationError {
+ p := new(CommandAck_ValidationError)
+ *p = x
+ return p
+}
+func (x CommandAck_ValidationError) String() string {
+ return proto.EnumName(CommandAck_ValidationError_name, int32(x))
+}
+func (x *CommandAck_ValidationError) UnmarshalJSON(data []byte) error {
+ value, err := proto.UnmarshalJSONEnum(CommandAck_ValidationError_value, data, "CommandAck_ValidationError")
+ if err != nil {
+ return err
+ }
+ *x = CommandAck_ValidationError(value)
+ return nil
+}
+func (CommandAck_ValidationError) EnumDescriptor() ([]byte, []int) {
+ return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{19, 1}
+}
+
+type CommandGetTopicsOfNamespace_Mode int32
+
+const (
+ CommandGetTopicsOfNamespace_PERSISTENT CommandGetTopicsOfNamespace_Mode = 0
+ CommandGetTopicsOfNamespace_NON_PERSISTENT CommandGetTopicsOfNamespace_Mode = 1
+ CommandGetTopicsOfNamespace_ALL CommandGetTopicsOfNamespace_Mode = 2
+)
+
+var CommandGetTopicsOfNamespace_Mode_name = map[int32]string{
+ 0: "PERSISTENT",
+ 1: "NON_PERSISTENT",
+ 2: "ALL",
+}
+var CommandGetTopicsOfNamespace_Mode_value = map[string]int32{
+ "PERSISTENT": 0,
+ "NON_PERSISTENT": 1,
+ "ALL": 2,
+}
+
+func (x CommandGetTopicsOfNamespace_Mode) Enum() *CommandGetTopicsOfNamespace_Mode {
+ p := new(CommandGetTopicsOfNamespace_Mode)
+ *p = x
+ return p
+}
+func (x CommandGetTopicsOfNamespace_Mode) String() string {
+ return proto.EnumName(CommandGetTopicsOfNamespace_Mode_name, int32(x))
+}
+func (x *CommandGetTopicsOfNamespace_Mode) UnmarshalJSON(data []byte) error {
+ value, err := proto.UnmarshalJSONEnum(CommandGetTopicsOfNamespace_Mode_value, data, "CommandGetTopicsOfNamespace_Mode")
+ if err != nil {
+ return err
+ }
+ *x = CommandGetTopicsOfNamespace_Mode(value)
+ return nil
+}
+func (CommandGetTopicsOfNamespace_Mode) EnumDescriptor() ([]byte, []int) {
+ return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{37, 0}
+}
+
+type BaseCommand_Type int32
+
+const (
+ BaseCommand_CONNECT BaseCommand_Type = 2
+ BaseCommand_CONNECTED BaseCommand_Type = 3
+ BaseCommand_SUBSCRIBE BaseCommand_Type = 4
+ BaseCommand_PRODUCER BaseCommand_Type = 5
+ BaseCommand_SEND BaseCommand_Type = 6
+ BaseCommand_SEND_RECEIPT BaseCommand_Type = 7
+ BaseCommand_SEND_ERROR BaseCommand_Type = 8
+ BaseCommand_MESSAGE BaseCommand_Type = 9
+ BaseCommand_ACK BaseCommand_Type = 10
+ BaseCommand_FLOW BaseCommand_Type = 11
+ BaseCommand_UNSUBSCRIBE BaseCommand_Type = 12
+ BaseCommand_SUCCESS BaseCommand_Type = 13
+ BaseCommand_ERROR BaseCommand_Type = 14
+ BaseCommand_CLOSE_PRODUCER BaseCommand_Type = 15
+ BaseCommand_CLOSE_CONSUMER BaseCommand_Type = 16
+ BaseCommand_PRODUCER_SUCCESS BaseCommand_Type = 17
+ BaseCommand_PING BaseCommand_Type = 18
+ BaseCommand_PONG BaseCommand_Type = 19
+ BaseCommand_REDELIVER_UNACKNOWLEDGED_MESSAGES BaseCommand_Type = 20
+ BaseCommand_PARTITIONED_METADATA BaseCommand_Type = 21
+ BaseCommand_PARTITIONED_METADATA_RESPONSE BaseCommand_Type = 22
+ BaseCommand_LOOKUP BaseCommand_Type = 23
+ BaseCommand_LOOKUP_RESPONSE BaseCommand_Type = 24
+ BaseCommand_CONSUMER_STATS BaseCommand_Type = 25
+ BaseCommand_CONSUMER_STATS_RESPONSE BaseCommand_Type = 26
+ BaseCommand_REACHED_END_OF_TOPIC BaseCommand_Type = 27
+ BaseCommand_SEEK BaseCommand_Type = 28
+ BaseCommand_GET_LAST_MESSAGE_ID BaseCommand_Type = 29
+ BaseCommand_GET_LAST_MESSAGE_ID_RESPONSE BaseCommand_Type = 30
+ BaseCommand_ACTIVE_CONSUMER_CHANGE BaseCommand_Type = 31
+ BaseCommand_GET_TOPICS_OF_NAMESPACE BaseCommand_Type = 32
+ BaseCommand_GET_TOPICS_OF_NAMESPACE_RESPONSE BaseCommand_Type = 33
+ BaseCommand_GET_SCHEMA BaseCommand_Type = 34
+ BaseCommand_GET_SCHEMA_RESPONSE BaseCommand_Type = 35
+)
+
+var BaseCommand_Type_name = map[int32]string{
+ 2: "CONNECT",
+ 3: "CONNECTED",
+ 4: "SUBSCRIBE",
+ 5: "PRODUCER",
+ 6: "SEND",
+ 7: "SEND_RECEIPT",
+ 8: "SEND_ERROR",
+ 9: "MESSAGE",
+ 10: "ACK",
+ 11: "FLOW",
+ 12: "UNSUBSCRIBE",
+ 13: "SUCCESS",
+ 14: "ERROR",
+ 15: "CLOSE_PRODUCER",
+ 16: "CLOSE_CONSUMER",
+ 17: "PRODUCER_SUCCESS",
+ 18: "PING",
+ 19: "PONG",
+ 20: "REDELIVER_UNACKNOWLEDGED_MESSAGES",
+ 21: "PARTITIONED_METADATA",
+ 22: "PARTITIONED_METADATA_RESPONSE",
+ 23: "LOOKUP",
+ 24: "LOOKUP_RESPONSE",
+ 25: "CONSUMER_STATS",
+ 26: "CONSUMER_STATS_RESPONSE",
+ 27: "REACHED_END_OF_TOPIC",
+ 28: "SEEK",
+ 29: "GET_LAST_MESSAGE_ID",
+ 30: "GET_LAST_MESSAGE_ID_RESPONSE",
+ 31: "ACTIVE_CONSUMER_CHANGE",
+ 32: "GET_TOPICS_OF_NAMESPACE",
+ 33: "GET_TOPICS_OF_NAMESPACE_RESPONSE",
+ 34: "GET_SCHEMA",
+ 35: "GET_SCHEMA_RESPONSE",
+}
+var BaseCommand_Type_value = map[string]int32{
+ "CONNECT": 2,
+ "CONNECTED": 3,
+ "SUBSCRIBE": 4,
+ "PRODUCER": 5,
+ "SEND": 6,
+ "SEND_RECEIPT": 7,
+ "SEND_ERROR": 8,
+ "MESSAGE": 9,
+ "ACK": 10,
+ "FLOW": 11,
+ "UNSUBSCRIBE": 12,
+ "SUCCESS": 13,
+ "ERROR": 14,
+ "CLOSE_PRODUCER": 15,
+ "CLOSE_CONSUMER": 16,
+ "PRODUCER_SUCCESS": 17,
+ "PING": 18,
+ "PONG": 19,
+ "REDELIVER_UNACKNOWLEDGED_MESSAGES": 20,
+ "PARTITIONED_METADATA": 21,
+ "PARTITIONED_METADATA_RESPONSE": 22,
+ "LOOKUP": 23,
+ "LOOKUP_RESPONSE": 24,
+ "CONSUMER_STATS": 25,
+ "CONSUMER_STATS_RESPONSE": 26,
+ "REACHED_END_OF_TOPIC": 27,
+ "SEEK": 28,
+ "GET_LAST_MESSAGE_ID": 29,
+ "GET_LAST_MESSAGE_ID_RESPONSE": 30,
+ "ACTIVE_CONSUMER_CHANGE": 31,
+ "GET_TOPICS_OF_NAMESPACE": 32,
+ "GET_TOPICS_OF_NAMESPACE_RESPONSE": 33,
+ "GET_SCHEMA": 34,
+ "GET_SCHEMA_RESPONSE": 35,
+}
+
+func (x BaseCommand_Type) Enum() *BaseCommand_Type {
+ p := new(BaseCommand_Type)
+ *p = x
+ return p
+}
+func (x BaseCommand_Type) String() string {
+ return proto.EnumName(BaseCommand_Type_name, int32(x))
+}
+func (x *BaseCommand_Type) UnmarshalJSON(data []byte) error {
+ value, err := proto.UnmarshalJSONEnum(BaseCommand_Type_value, data, "BaseCommand_Type")
+ if err != nil {
+ return err
+ }
+ *x = BaseCommand_Type(value)
+ return nil
+}
+func (BaseCommand_Type) EnumDescriptor() ([]byte, []int) {
+ return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{41, 0}
+}
+
+type Schema struct {
+ Name *string `protobuf:"bytes,1,req,name=name" json:"name,omitempty"`
+ SchemaData []byte `protobuf:"bytes,3,req,name=schema_data,json=schemaData" json:"schema_data,omitempty"`
+ Type *Schema_Type `protobuf:"varint,4,req,name=type,enum=pulsar.proto.Schema_Type" json:"type,omitempty"`
+ Properties []*KeyValue `protobuf:"bytes,5,rep,name=properties" json:"properties,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *Schema) Reset() { *m = Schema{} }
+func (m *Schema) String() string { return proto.CompactTextString(m) }
+func (*Schema) ProtoMessage() {}
+func (*Schema) Descriptor() ([]byte, []int) {
+ return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{0}
+}
+func (m *Schema) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_Schema.Unmarshal(m, b)
+}
+func (m *Schema) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_Schema.Marshal(b, m, deterministic)
+}
+func (dst *Schema) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_Schema.Merge(dst, src)
+}
+func (m *Schema) XXX_Size() int {
+ return xxx_messageInfo_Schema.Size(m)
+}
+func (m *Schema) XXX_DiscardUnknown() {
+ xxx_messageInfo_Schema.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_Schema proto.InternalMessageInfo
+
+func (m *Schema) GetName() string {
+ if m != nil && m.Name != nil {
+ return *m.Name
+ }
+ return ""
+}
+
+func (m *Schema) GetSchemaData() []byte {
+ if m != nil {
+ return m.SchemaData
+ }
+ return nil
+}
+
+func (m *Schema) GetType() Schema_Type {
+ if m != nil && m.Type != nil {
+ return *m.Type
+ }
+ return Schema_None
+}
+
+func (m *Schema) GetProperties() []*KeyValue {
+ if m != nil {
+ return m.Properties
+ }
+ return nil
+}
+
+type MessageIdData struct {
+ LedgerId *uint64 `protobuf:"varint,1,req,name=ledgerId" json:"ledgerId,omitempty"`
+ EntryId *uint64 `protobuf:"varint,2,req,name=entryId" json:"entryId,omitempty"`
+ Partition *int32 `protobuf:"varint,3,opt,name=partition,def=-1" json:"partition,omitempty"`
+ BatchIndex *int32 `protobuf:"varint,4,opt,name=batch_index,json=batchIndex,def=-1" json:"batch_index,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *MessageIdData) Reset() { *m = MessageIdData{} }
+func (m *MessageIdData) String() string { return proto.CompactTextString(m) }
+func (*MessageIdData) ProtoMessage() {}
+func (*MessageIdData) Descriptor() ([]byte, []int) {
+ return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{1}
+}
+func (m *MessageIdData) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_MessageIdData.Unmarshal(m, b)
+}
+func (m *MessageIdData) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_MessageIdData.Marshal(b, m, deterministic)
+}
+func (dst *MessageIdData) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_MessageIdData.Merge(dst, src)
+}
+func (m *MessageIdData) XXX_Size() int {
+ return xxx_messageInfo_MessageIdData.Size(m)
+}
+func (m *MessageIdData) XXX_DiscardUnknown() {
+ xxx_messageInfo_MessageIdData.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_MessageIdData proto.InternalMessageInfo
+
+const Default_MessageIdData_Partition int32 = -1
+const Default_MessageIdData_BatchIndex int32 = -1
+
+func (m *MessageIdData) GetLedgerId() uint64 {
+ if m != nil && m.LedgerId != nil {
+ return *m.LedgerId
+ }
+ return 0
+}
+
+func (m *MessageIdData) GetEntryId() uint64 {
+ if m != nil && m.EntryId != nil {
+ return *m.EntryId
+ }
+ return 0
+}
+
+func (m *MessageIdData) GetPartition() int32 {
+ if m != nil && m.Partition != nil {
+ return *m.Partition
+ }
+ return Default_MessageIdData_Partition
+}
+
+func (m *MessageIdData) GetBatchIndex() int32 {
+ if m != nil && m.BatchIndex != nil {
+ return *m.BatchIndex
+ }
+ return Default_MessageIdData_BatchIndex
+}
+
+type KeyValue struct {
+ Key *string `protobuf:"bytes,1,req,name=key" json:"key,omitempty"`
+ Value *string `protobuf:"bytes,2,req,name=value" json:"value,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *KeyValue) Reset() { *m = KeyValue{} }
+func (m *KeyValue) String() string { return proto.CompactTextString(m) }
+func (*KeyValue) ProtoMessage() {}
+func (*KeyValue) Descriptor() ([]byte, []int) {
+ return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{2}
+}
+func (m *KeyValue) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_KeyValue.Unmarshal(m, b)
+}
+func (m *KeyValue) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_KeyValue.Marshal(b, m, deterministic)
+}
+func (dst *KeyValue) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_KeyValue.Merge(dst, src)
+}
+func (m *KeyValue) XXX_Size() int {
+ return xxx_messageInfo_KeyValue.Size(m)
+}
+func (m *KeyValue) XXX_DiscardUnknown() {
+ xxx_messageInfo_KeyValue.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_KeyValue proto.InternalMessageInfo
+
+func (m *KeyValue) GetKey() string {
+ if m != nil && m.Key != nil {
+ return *m.Key
+ }
+ return ""
+}
+
+func (m *KeyValue) GetValue() string {
+ if m != nil && m.Value != nil {
+ return *m.Value
+ }
+ return ""
+}
+
+type KeyLongValue struct {
+ Key *string `protobuf:"bytes,1,req,name=key" json:"key,omitempty"`
+ Value *uint64 `protobuf:"varint,2,req,name=value" json:"value,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *KeyLongValue) Reset() { *m = KeyLongValue{} }
+func (m *KeyLongValue) String() string { return proto.CompactTextString(m) }
+func (*KeyLongValue) ProtoMessage() {}
+func (*KeyLongValue) Descriptor() ([]byte, []int) {
+ return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{3}
+}
+func (m *KeyLongValue) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_KeyLongValue.Unmarshal(m, b)
+}
+func (m *KeyLongValue) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_KeyLongValue.Marshal(b, m, deterministic)
+}
+func (dst *KeyLongValue) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_KeyLongValue.Merge(dst, src)
+}
+func (m *KeyLongValue) XXX_Size() int {
+ return xxx_messageInfo_KeyLongValue.Size(m)
+}
+func (m *KeyLongValue) XXX_DiscardUnknown() {
+ xxx_messageInfo_KeyLongValue.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_KeyLongValue proto.InternalMessageInfo
+
+func (m *KeyLongValue) GetKey() string {
+ if m != nil && m.Key != nil {
+ return *m.Key
+ }
+ return ""
+}
+
+func (m *KeyLongValue) GetValue() uint64 {
+ if m != nil && m.Value != nil {
+ return *m.Value
+ }
+ return 0
+}
+
+type EncryptionKeys struct {
+ Key *string `protobuf:"bytes,1,req,name=key" json:"key,omitempty"`
+ Value []byte `protobuf:"bytes,2,req,name=value" json:"value,omitempty"`
+ Metadata []*KeyValue `protobuf:"bytes,3,rep,name=metadata" json:"metadata,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *EncryptionKeys) Reset() { *m = EncryptionKeys{} }
+func (m *EncryptionKeys) String() string { return proto.CompactTextString(m) }
+func (*EncryptionKeys) ProtoMessage() {}
+func (*EncryptionKeys) Descriptor() ([]byte, []int) {
+ return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{4}
+}
+func (m *EncryptionKeys) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_EncryptionKeys.Unmarshal(m, b)
+}
+func (m *EncryptionKeys) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_EncryptionKeys.Marshal(b, m, deterministic)
+}
+func (dst *EncryptionKeys) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_EncryptionKeys.Merge(dst, src)
+}
+func (m *EncryptionKeys) XXX_Size() int {
+ return xxx_messageInfo_EncryptionKeys.Size(m)
+}
+func (m *EncryptionKeys) XXX_DiscardUnknown() {
+ xxx_messageInfo_EncryptionKeys.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_EncryptionKeys proto.InternalMessageInfo
+
+func (m *EncryptionKeys) GetKey() string {
+ if m != nil && m.Key != nil {
+ return *m.Key
+ }
+ return ""
+}
+
+func (m *EncryptionKeys) GetValue() []byte {
+ if m != nil {
+ return m.Value
+ }
+ return nil
+}
+
+func (m *EncryptionKeys) GetMetadata() []*KeyValue {
+ if m != nil {
+ return m.Metadata
+ }
+ return nil
+}
+
+type MessageMetadata struct {
+ ProducerName *string `protobuf:"bytes,1,req,name=producer_name,json=producerName" json:"producer_name,omitempty"`
+ SequenceId *uint64 `protobuf:"varint,2,req,name=sequence_id,json=sequenceId" json:"sequence_id,omitempty"`
+ PublishTime *uint64 `protobuf:"varint,3,req,name=publish_time,json=publishTime" json:"publish_time,omitempty"`
+ Properties []*KeyValue `protobuf:"bytes,4,rep,name=properties" json:"properties,omitempty"`
+ // Property set on replicated message,
+ // includes the source cluster name
+ ReplicatedFrom *string `protobuf:"bytes,5,opt,name=replicated_from,json=replicatedFrom" json:"replicated_from,omitempty"`
+ // key to decide partition for the msg
+ PartitionKey *string `protobuf:"bytes,6,opt,name=partition_key,json=partitionKey" json:"partition_key,omitempty"`
+ // Override namespace's replication
+ ReplicateTo []string `protobuf:"bytes,7,rep,name=replicate_to,json=replicateTo" json:"replicate_to,omitempty"`
+ Compression *CompressionType `protobuf:"varint,8,opt,name=compression,enum=pulsar.proto.CompressionType,def=0" json:"compression,omitempty"`
+ UncompressedSize *uint32 `protobuf:"varint,9,opt,name=uncompressed_size,json=uncompressedSize,def=0" json:"uncompressed_size,omitempty"`
+ // Removed below checksum field from Metadata as
+ // it should be part of send-command which keeps checksum of header + payload
+ // optional sfixed64 checksum = 10;
+ // differentiate single and batch message metadata
+ NumMessagesInBatch *int32 `protobuf:"varint,11,opt,name=num_messages_in_batch,json=numMessagesInBatch,def=1" json:"num_messages_in_batch,omitempty"`
+ // the timestamp that this event occurs. it is typically set by applications.
+ // if this field is omitted, `publish_time` can be used for the purpose of `event_time`.
+ EventTime *uint64 `protobuf:"varint,12,opt,name=event_time,json=eventTime,def=0" json:"event_time,omitempty"`
+ // Contains encryption key name, encrypted key and metadata to describe the key
+ EncryptionKeys []*EncryptionKeys `protobuf:"bytes,13,rep,name=encryption_keys,json=encryptionKeys" json:"encryption_keys,omitempty"`
+ // Algorithm used to encrypt data key
+ EncryptionAlgo *string `protobuf:"bytes,14,opt,name=encryption_algo,json=encryptionAlgo" json:"encryption_algo,omitempty"`
+ // Additional parameters required by encryption
+ EncryptionParam []byte `protobuf:"bytes,15,opt,name=encryption_param,json=encryptionParam" json:"encryption_param,omitempty"`
+ SchemaVersion []byte `protobuf:"bytes,16,opt,name=schema_version,json=schemaVersion" json:"schema_version,omitempty"`
+ PartitionKeyB64Encoded *bool `protobuf:"varint,17,opt,name=partition_key_b64_encoded,json=partitionKeyB64Encoded,def=0" json:"partition_key_b64_encoded,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *MessageMetadata) Reset() { *m = MessageMetadata{} }
+func (m *MessageMetadata) String() string { return proto.CompactTextString(m) }
+func (*MessageMetadata) ProtoMessage() {}
+func (*MessageMetadata) Descriptor() ([]byte, []int) {
+ return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{5}
+}
+func (m *MessageMetadata) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_MessageMetadata.Unmarshal(m, b)
+}
+func (m *MessageMetadata) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_MessageMetadata.Marshal(b, m, deterministic)
+}
+func (dst *MessageMetadata) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_MessageMetadata.Merge(dst, src)
+}
+func (m *MessageMetadata) XXX_Size() int {
+ return xxx_messageInfo_MessageMetadata.Size(m)
+}
+func (m *MessageMetadata) XXX_DiscardUnknown() {
+ xxx_messageInfo_MessageMetadata.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_MessageMetadata proto.InternalMessageInfo
+
+const Default_MessageMetadata_Compression CompressionType = CompressionType_NONE
+const Default_MessageMetadata_UncompressedSize uint32 = 0
+const Default_MessageMetadata_NumMessagesInBatch int32 = 1
+const Default_MessageMetadata_EventTime uint64 = 0
+const Default_MessageMetadata_PartitionKeyB64Encoded bool = false
+
+func (m *MessageMetadata) GetProducerName() string {
+ if m != nil && m.ProducerName != nil {
+ return *m.ProducerName
+ }
+ return ""
+}
+
+func (m *MessageMetadata) GetSequenceId() uint64 {
+ if m != nil && m.SequenceId != nil {
+ return *m.SequenceId
+ }
+ return 0
+}
+
+func (m *MessageMetadata) GetPublishTime() uint64 {
+ if m != nil && m.PublishTime != nil {
+ return *m.PublishTime
+ }
+ return 0
+}
+
+func (m *MessageMetadata) GetProperties() []*KeyValue {
+ if m != nil {
+ return m.Properties
+ }
+ return nil
+}
+
+func (m *MessageMetadata) GetReplicatedFrom() string {
+ if m != nil && m.ReplicatedFrom != nil {
+ return *m.ReplicatedFrom
+ }
+ return ""
+}
+
+func (m *MessageMetadata) GetPartitionKey() string {
+ if m != nil && m.PartitionKey != nil {
+ return *m.PartitionKey
+ }
+ return ""
+}
+
+func (m *MessageMetadata) GetReplicateTo() []string {
+ if m != nil {
+ return m.ReplicateTo
+ }
+ return nil
+}
+
+func (m *MessageMetadata) GetCompression() CompressionType {
+ if m != nil && m.Compression != nil {
+ return *m.Compression
+ }
+ return Default_MessageMetadata_Compression
+}
+
+func (m *MessageMetadata) GetUncompressedSize() uint32 {
+ if m != nil && m.UncompressedSize != nil {
+ return *m.UncompressedSize
+ }
+ return Default_MessageMetadata_UncompressedSize
+}
+
+func (m *MessageMetadata) GetNumMessagesInBatch() int32 {
+ if m != nil && m.NumMessagesInBatch != nil {
+ return *m.NumMessagesInBatch
+ }
+ return Default_MessageMetadata_NumMessagesInBatch
+}
+
+func (m *MessageMetadata) GetEventTime() uint64 {
+ if m != nil && m.EventTime != nil {
+ return *m.EventTime
+ }
+ return Default_MessageMetadata_EventTime
+}
+
+func (m *MessageMetadata) GetEncryptionKeys() []*EncryptionKeys {
+ if m != nil {
+ return m.EncryptionKeys
+ }
+ return nil
+}
+
+func (m *MessageMetadata) GetEncryptionAlgo() string {
+ if m != nil && m.EncryptionAlgo != nil {
+ return *m.EncryptionAlgo
+ }
+ return ""
+}
+
+func (m *MessageMetadata) GetEncryptionParam() []byte {
+ if m != nil {
+ return m.EncryptionParam
+ }
+ return nil
+}
+
+func (m *MessageMetadata) GetSchemaVersion() []byte {
+ if m != nil {
+ return m.SchemaVersion
+ }
+ return nil
+}
+
+func (m *MessageMetadata) GetPartitionKeyB64Encoded() bool {
+ if m != nil && m.PartitionKeyB64Encoded != nil {
+ return *m.PartitionKeyB64Encoded
+ }
+ return Default_MessageMetadata_PartitionKeyB64Encoded
+}
+
+type SingleMessageMetadata struct {
+ Properties []*KeyValue `protobuf:"bytes,1,rep,name=properties" json:"properties,omitempty"`
+ PartitionKey *string `protobuf:"bytes,2,opt,name=partition_key,json=partitionKey" json:"partition_key,omitempty"`
+ PayloadSize *int32 `protobuf:"varint,3,req,name=payload_size,json=payloadSize" json:"payload_size,omitempty"`
+ CompactedOut *bool `protobuf:"varint,4,opt,name=compacted_out,json=compactedOut,def=0" json:"compacted_out,omitempty"`
+ // the timestamp that this event occurs. it is typically set by applications.
+ // if this field is omitted, `publish_time` can be used for the purpose of `event_time`.
+ EventTime *uint64 `protobuf:"varint,5,opt,name=event_time,json=eventTime,def=0" json:"event_time,omitempty"`
+ PartitionKeyB64Encoded *bool `protobuf:"varint,6,opt,name=partition_key_b64_encoded,json=partitionKeyB64Encoded,def=0" json:"partition_key_b64_encoded,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *SingleMessageMetadata) Reset() { *m = SingleMessageMetadata{} }
+func (m *SingleMessageMetadata) String() string { return proto.CompactTextString(m) }
+func (*SingleMessageMetadata) ProtoMessage() {}
+func (*SingleMessageMetadata) Descriptor() ([]byte, []int) {
+ return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{6}
+}
+func (m *SingleMessageMetadata) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_SingleMessageMetadata.Unmarshal(m, b)
+}
+func (m *SingleMessageMetadata) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_SingleMessageMetadata.Marshal(b, m, deterministic)
+}
+func (dst *SingleMessageMetadata) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_SingleMessageMetadata.Merge(dst, src)
+}
+func (m *SingleMessageMetadata) XXX_Size() int {
+ return xxx_messageInfo_SingleMessageMetadata.Size(m)
+}
+func (m *SingleMessageMetadata) XXX_DiscardUnknown() {
+ xxx_messageInfo_SingleMessageMetadata.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_SingleMessageMetadata proto.InternalMessageInfo
+
+const Default_SingleMessageMetadata_CompactedOut bool = false
+const Default_SingleMessageMetadata_EventTime uint64 = 0
+const Default_SingleMessageMetadata_PartitionKeyB64Encoded bool = false
+
+func (m *SingleMessageMetadata) GetProperties() []*KeyValue {
+ if m != nil {
+ return m.Properties
+ }
+ return nil
+}
+
+func (m *SingleMessageMetadata) GetPartitionKey() string {
+ if m != nil && m.PartitionKey != nil {
+ return *m.PartitionKey
+ }
+ return ""
+}
+
+func (m *SingleMessageMetadata) GetPayloadSize() int32 {
+ if m != nil && m.PayloadSize != nil {
+ return *m.PayloadSize
+ }
+ return 0
+}
+
+func (m *SingleMessageMetadata) GetCompactedOut() bool {
+ if m != nil && m.CompactedOut != nil {
+ return *m.CompactedOut
+ }
+ return Default_SingleMessageMetadata_CompactedOut
+}
+
+func (m *SingleMessageMetadata) GetEventTime() uint64 {
+ if m != nil && m.EventTime != nil {
+ return *m.EventTime
+ }
+ return Default_SingleMessageMetadata_EventTime
+}
+
+func (m *SingleMessageMetadata) GetPartitionKeyB64Encoded() bool {
+ if m != nil && m.PartitionKeyB64Encoded != nil {
+ return *m.PartitionKeyB64Encoded
+ }
+ return Default_SingleMessageMetadata_PartitionKeyB64Encoded
+}
+
+type CommandConnect struct {
+ ClientVersion *string `protobuf:"bytes,1,req,name=client_version,json=clientVersion" json:"client_version,omitempty"`
+ AuthMethod *AuthMethod `protobuf:"varint,2,opt,name=auth_method,json=authMethod,enum=pulsar.proto.AuthMethod" json:"auth_method,omitempty"`
+ AuthMethodName *string `protobuf:"bytes,5,opt,name=auth_method_name,json=authMethodName" json:"auth_method_name,omitempty"`
+ AuthData []byte `protobuf:"bytes,3,opt,name=auth_data,json=authData" json:"auth_data,omitempty"`
+ ProtocolVersion *int32 `protobuf:"varint,4,opt,name=protocol_version,json=protocolVersion,def=0" json:"protocol_version,omitempty"`
+ // Client can ask to be proxyied to a specific broker
+ // This is only honored by a Pulsar proxy
+ ProxyToBrokerUrl *string `protobuf:"bytes,6,opt,name=proxy_to_broker_url,json=proxyToBrokerUrl" json:"proxy_to_broker_url,omitempty"`
+ // Original principal that was verified by
+ // a Pulsar proxy. In this case the auth info above
+ // will be the auth of the proxy itself
+ OriginalPrincipal *string `protobuf:"bytes,7,opt,name=original_principal,json=originalPrincipal" json:"original_principal,omitempty"`
+ // Original auth role and auth Method that was passed
+ // to the proxy. In this case the auth info above
+ // will be the auth of the proxy itself
+ OriginalAuthData *string `protobuf:"bytes,8,opt,name=original_auth_data,json=originalAuthData" json:"original_auth_data,omitempty"`
+ OriginalAuthMethod *string `protobuf:"bytes,9,opt,name=original_auth_method,json=originalAuthMethod" json:"original_auth_method,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *CommandConnect) Reset() { *m = CommandConnect{} }
+func (m *CommandConnect) String() string { return proto.CompactTextString(m) }
+func (*CommandConnect) ProtoMessage() {}
+func (*CommandConnect) Descriptor() ([]byte, []int) {
+ return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{7}
+}
+func (m *CommandConnect) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_CommandConnect.Unmarshal(m, b)
+}
+func (m *CommandConnect) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_CommandConnect.Marshal(b, m, deterministic)
+}
+func (dst *CommandConnect) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_CommandConnect.Merge(dst, src)
+}
+func (m *CommandConnect) XXX_Size() int {
+ return xxx_messageInfo_CommandConnect.Size(m)
+}
+func (m *CommandConnect) XXX_DiscardUnknown() {
+ xxx_messageInfo_CommandConnect.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_CommandConnect proto.InternalMessageInfo
+
+const Default_CommandConnect_ProtocolVersion int32 = 0
+
+func (m *CommandConnect) GetClientVersion() string {
+ if m != nil && m.ClientVersion != nil {
+ return *m.ClientVersion
+ }
+ return ""
+}
+
+func (m *CommandConnect) GetAuthMethod() AuthMethod {
+ if m != nil && m.AuthMethod != nil {
+ return *m.AuthMethod
+ }
+ return AuthMethod_AuthMethodNone
+}
+
+func (m *CommandConnect) GetAuthMethodName() string {
+ if m != nil && m.AuthMethodName != nil {
+ return *m.AuthMethodName
+ }
+ return ""
+}
+
+func (m *CommandConnect) GetAuthData() []byte {
+ if m != nil {
+ return m.AuthData
+ }
+ return nil
+}
+
+func (m *CommandConnect) GetProtocolVersion() int32 {
+ if m != nil && m.ProtocolVersion != nil {
+ return *m.ProtocolVersion
+ }
+ return Default_CommandConnect_ProtocolVersion
+}
+
+func (m *CommandConnect) GetProxyToBrokerUrl() string {
+ if m != nil && m.ProxyToBrokerUrl != nil {
+ return *m.ProxyToBrokerUrl
+ }
+ return ""
+}
+
+func (m *CommandConnect) GetOriginalPrincipal() string {
+ if m != nil && m.OriginalPrincipal != nil {
+ return *m.OriginalPrincipal
+ }
+ return ""
+}
+
+func (m *CommandConnect) GetOriginalAuthData() string {
+ if m != nil && m.OriginalAuthData != nil {
+ return *m.OriginalAuthData
+ }
+ return ""
+}
+
+func (m *CommandConnect) GetOriginalAuthMethod() string {
+ if m != nil && m.OriginalAuthMethod != nil {
+ return *m.OriginalAuthMethod
+ }
+ return ""
+}
+
+type CommandConnected struct {
+ ServerVersion *string `protobuf:"bytes,1,req,name=server_version,json=serverVersion" json:"server_version,omitempty"`
+ ProtocolVersion *int32 `protobuf:"varint,2,opt,name=protocol_version,json=protocolVersion,def=0" json:"protocol_version,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *CommandConnected) Reset() { *m = CommandConnected{} }
+func (m *CommandConnected) String() string { return proto.CompactTextString(m) }
+func (*CommandConnected) ProtoMessage() {}
+func (*CommandConnected) Descriptor() ([]byte, []int) {
+ return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{8}
+}
+func (m *CommandConnected) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_CommandConnected.Unmarshal(m, b)
+}
+func (m *CommandConnected) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_CommandConnected.Marshal(b, m, deterministic)
+}
+func (dst *CommandConnected) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_CommandConnected.Merge(dst, src)
+}
+func (m *CommandConnected) XXX_Size() int {
+ return xxx_messageInfo_CommandConnected.Size(m)
+}
+func (m *CommandConnected) XXX_DiscardUnknown() {
+ xxx_messageInfo_CommandConnected.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_CommandConnected proto.InternalMessageInfo
+
+const Default_CommandConnected_ProtocolVersion int32 = 0
+
+func (m *CommandConnected) GetServerVersion() string {
+ if m != nil && m.ServerVersion != nil {
+ return *m.ServerVersion
+ }
+ return ""
+}
+
+func (m *CommandConnected) GetProtocolVersion() int32 {
+ if m != nil && m.ProtocolVersion != nil {
+ return *m.ProtocolVersion
+ }
+ return Default_CommandConnected_ProtocolVersion
+}
+
+type CommandSubscribe struct {
+ Topic *string `protobuf:"bytes,1,req,name=topic" json:"topic,omitempty"`
+ Subscription *string `protobuf:"bytes,2,req,name=subscription" json:"subscription,omitempty"`
+ SubType *CommandSubscribe_SubType `protobuf:"varint,3,req,name=subType,enum=pulsar.proto.CommandSubscribe_SubType" json:"subType,omitempty"`
+ ConsumerId *uint64 `protobuf:"varint,4,req,name=consumer_id,json=consumerId" json:"consumer_id,omitempty"`
+ RequestId *uint64 `protobuf:"varint,5,req,name=request_id,json=requestId" json:"request_id,omitempty"`
+ ConsumerName *string `protobuf:"bytes,6,opt,name=consumer_name,json=consumerName" json:"consumer_name,omitempty"`
+ PriorityLevel *int32 `protobuf:"varint,7,opt,name=priority_level,json=priorityLevel" json:"priority_level,omitempty"`
+ // Signal wether the subscription should be backed by a
+ // durable cursor or not
+ Durable *bool `protobuf:"varint,8,opt,name=durable,def=1" json:"durable,omitempty"`
+ // If specified, the subscription will position the cursor
+ // markd-delete position on the particular message id and
+ // will send messages from that point
+ StartMessageId *MessageIdData `protobuf:"bytes,9,opt,name=start_message_id,json=startMessageId" json:"start_message_id,omitempty"`
+ // / Add optional metadata key=value to this consumer
+ Metadata []*KeyValue `protobuf:"bytes,10,rep,name=metadata" json:"metadata,omitempty"`
+ ReadCompacted *bool `protobuf:"varint,11,opt,name=read_compacted,json=readCompacted" json:"read_compacted,omitempty"`
+ Schema *Schema `protobuf:"bytes,12,opt,name=schema" json:"schema,omitempty"`
+ // Signal wthether the subscription will initialize on latest
+ // or not -- earliest
+ InitialPosition *CommandSubscribe_InitialPosition `protobuf:"varint,13,opt,name=initialPosition,enum=pulsar.proto.CommandSubscribe_InitialPosition,def=0" json:"initialPosition,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *CommandSubscribe) Reset() { *m = CommandSubscribe{} }
+func (m *CommandSubscribe) String() string { return proto.CompactTextString(m) }
+func (*CommandSubscribe) ProtoMessage() {}
+func (*CommandSubscribe) Descriptor() ([]byte, []int) {
+ return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{9}
+}
+func (m *CommandSubscribe) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_CommandSubscribe.Unmarshal(m, b)
+}
+func (m *CommandSubscribe) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_CommandSubscribe.Marshal(b, m, deterministic)
+}
+func (dst *CommandSubscribe) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_CommandSubscribe.Merge(dst, src)
+}
+func (m *CommandSubscribe) XXX_Size() int {
+ return xxx_messageInfo_CommandSubscribe.Size(m)
+}
+func (m *CommandSubscribe) XXX_DiscardUnknown() {
+ xxx_messageInfo_CommandSubscribe.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_CommandSubscribe proto.InternalMessageInfo
+
+const Default_CommandSubscribe_Durable bool = true
+const Default_CommandSubscribe_InitialPosition CommandSubscribe_InitialPosition = CommandSubscribe_Latest
+
+func (m *CommandSubscribe) GetTopic() string {
+ if m != nil && m.Topic != nil {
+ return *m.Topic
+ }
+ return ""
+}
+
+func (m *CommandSubscribe) GetSubscription() string {
+ if m != nil && m.Subscription != nil {
+ return *m.Subscription
+ }
+ return ""
+}
+
+func (m *CommandSubscribe) GetSubType() CommandSubscribe_SubType {
+ if m != nil && m.SubType != nil {
+ return *m.SubType
+ }
+ return CommandSubscribe_Exclusive
+}
+
+func (m *CommandSubscribe) GetConsumerId() uint64 {
+ if m != nil && m.ConsumerId != nil {
+ return *m.ConsumerId
+ }
+ return 0
+}
+
+func (m *CommandSubscribe) GetRequestId() uint64 {
+ if m != nil && m.RequestId != nil {
+ return *m.RequestId
+ }
+ return 0
+}
+
+func (m *CommandSubscribe) GetConsumerName() string {
+ if m != nil && m.ConsumerName != nil {
+ return *m.ConsumerName
+ }
+ return ""
+}
+
+func (m *CommandSubscribe) GetPriorityLevel() int32 {
+ if m != nil && m.PriorityLevel != nil {
+ return *m.PriorityLevel
+ }
+ return 0
+}
+
+func (m *CommandSubscribe) GetDurable() bool {
+ if m != nil && m.Durable != nil {
+ return *m.Durable
+ }
+ return Default_CommandSubscribe_Durable
+}
+
+func (m *CommandSubscribe) GetStartMessageId() *MessageIdData {
+ if m != nil {
+ return m.StartMessageId
+ }
+ return nil
+}
+
+func (m *CommandSubscribe) GetMetadata() []*KeyValue {
+ if m != nil {
+ return m.Metadata
+ }
+ return nil
+}
+
+func (m *CommandSubscribe) GetReadCompacted() bool {
+ if m != nil && m.ReadCompacted != nil {
+ return *m.ReadCompacted
+ }
+ return false
+}
+
+func (m *CommandSubscribe) GetSchema() *Schema {
+ if m != nil {
+ return m.Schema
+ }
+ return nil
+}
+
+func (m *CommandSubscribe) GetInitialPosition() CommandSubscribe_InitialPosition {
+ if m != nil && m.InitialPosition != nil {
+ return *m.InitialPosition
+ }
+ return Default_CommandSubscribe_InitialPosition
+}
+
+type CommandPartitionedTopicMetadata struct {
+ Topic *string `protobuf:"bytes,1,req,name=topic" json:"topic,omitempty"`
+ RequestId *uint64 `protobuf:"varint,2,req,name=request_id,json=requestId" json:"request_id,omitempty"`
+ // TODO - Remove original_principal, original_auth_data, original_auth_method
+ // Original principal that was verified by
+ // a Pulsar proxy.
+ OriginalPrincipal *string `protobuf:"bytes,3,opt,name=original_principal,json=originalPrincipal" json:"original_principal,omitempty"`
+ // Original auth role and auth Method that was passed
+ // to the proxy.
+ OriginalAuthData *string `protobuf:"bytes,4,opt,name=original_auth_data,json=originalAuthData" json:"original_auth_data,omitempty"`
+ OriginalAuthMethod *string `protobuf:"bytes,5,opt,name=original_auth_method,json=originalAuthMethod" json:"original_auth_method,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *CommandPartitionedTopicMetadata) Reset() { *m = CommandPartitionedTopicMetadata{} }
+func (m *CommandPartitionedTopicMetadata) String() string { return proto.CompactTextString(m) }
+func (*CommandPartitionedTopicMetadata) ProtoMessage() {}
+func (*CommandPartitionedTopicMetadata) Descriptor() ([]byte, []int) {
+ return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{10}
+}
+func (m *CommandPartitionedTopicMetadata) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_CommandPartitionedTopicMetadata.Unmarshal(m, b)
+}
+func (m *CommandPartitionedTopicMetadata) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_CommandPartitionedTopicMetadata.Marshal(b, m, deterministic)
+}
+func (dst *CommandPartitionedTopicMetadata) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_CommandPartitionedTopicMetadata.Merge(dst, src)
+}
+func (m *CommandPartitionedTopicMetadata) XXX_Size() int {
+ return xxx_messageInfo_CommandPartitionedTopicMetadata.Size(m)
+}
+func (m *CommandPartitionedTopicMetadata) XXX_DiscardUnknown() {
+ xxx_messageInfo_CommandPartitionedTopicMetadata.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_CommandPartitionedTopicMetadata proto.InternalMessageInfo
+
+func (m *CommandPartitionedTopicMetadata) GetTopic() string {
+ if m != nil && m.Topic != nil {
+ return *m.Topic
+ }
+ return ""
+}
+
+func (m *CommandPartitionedTopicMetadata) GetRequestId() uint64 {
+ if m != nil && m.RequestId != nil {
+ return *m.RequestId
+ }
+ return 0
+}
+
+func (m *CommandPartitionedTopicMetadata) GetOriginalPrincipal() string {
+ if m != nil && m.OriginalPrincipal != nil {
+ return *m.OriginalPrincipal
+ }
+ return ""
+}
+
+func (m *CommandPartitionedTopicMetadata) GetOriginalAuthData() string {
+ if m != nil && m.OriginalAuthData != nil {
+ return *m.OriginalAuthData
+ }
+ return ""
+}
+
+func (m *CommandPartitionedTopicMetadata) GetOriginalAuthMethod() string {
+ if m != nil && m.OriginalAuthMethod != nil {
+ return *m.OriginalAuthMethod
+ }
+ return ""
+}
+
+type CommandPartitionedTopicMetadataResponse struct {
+ Partitions *uint32 `protobuf:"varint,1,opt,name=partitions" json:"partitions,omitempty"`
+ RequestId *uint64 `protobuf:"varint,2,req,name=request_id,json=requestId" json:"request_id,omitempty"`
+ Response *CommandPartitionedTopicMetadataResponse_LookupType `protobuf:"varint,3,opt,name=response,enum=pulsar.proto.CommandPartitionedTopicMetadataResponse_LookupType" json:"response,omitempty"`
+ Error *ServerError `protobuf:"varint,4,opt,name=error,enum=pulsar.proto.ServerError" json:"error,omitempty"`
+ Message *string `protobuf:"bytes,5,opt,name=message" json:"message,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *CommandPartitionedTopicMetadataResponse) Reset() {
+ *m = CommandPartitionedTopicMetadataResponse{}
+}
+func (m *CommandPartitionedTopicMetadataResponse) String() string { return proto.CompactTextString(m) }
+func (*CommandPartitionedTopicMetadataResponse) ProtoMessage() {}
+func (*CommandPartitionedTopicMetadataResponse) Descriptor() ([]byte, []int) {
+ return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{11}
+}
+func (m *CommandPartitionedTopicMetadataResponse) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_CommandPartitionedTopicMetadataResponse.Unmarshal(m, b)
+}
+func (m *CommandPartitionedTopicMetadataResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_CommandPartitionedTopicMetadataResponse.Marshal(b, m, deterministic)
+}
+func (dst *CommandPartitionedTopicMetadataResponse) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_CommandPartitionedTopicMetadataResponse.Merge(dst, src)
+}
+func (m *CommandPartitionedTopicMetadataResponse) XXX_Size() int {
+ return xxx_messageInfo_CommandPartitionedTopicMetadataResponse.Size(m)
+}
+func (m *CommandPartitionedTopicMetadataResponse) XXX_DiscardUnknown() {
+ xxx_messageInfo_CommandPartitionedTopicMetadataResponse.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_CommandPartitionedTopicMetadataResponse proto.InternalMessageInfo
+
+func (m *CommandPartitionedTopicMetadataResponse) GetPartitions() uint32 {
+ if m != nil && m.Partitions != nil {
+ return *m.Partitions
+ }
+ return 0
+}
+
+func (m *CommandPartitionedTopicMetadataResponse) GetRequestId() uint64 {
+ if m != nil && m.RequestId != nil {
+ return *m.RequestId
+ }
+ return 0
+}
+
+func (m *CommandPartitionedTopicMetadataResponse) GetResponse() CommandPartitionedTopicMetadataResponse_LookupType {
+ if m != nil && m.Response != nil {
+ return *m.Response
+ }
+ return CommandPartitionedTopicMetadataResponse_Success
+}
+
+func (m *CommandPartitionedTopicMetadataResponse) GetError() ServerError {
+ if m != nil && m.Error != nil {
+ return *m.Error
+ }
+ return ServerError_UnknownError
+}
+
+func (m *CommandPartitionedTopicMetadataResponse) GetMessage() string {
+ if m != nil && m.Message != nil {
+ return *m.Message
+ }
+ return ""
+}
+
+type CommandLookupTopic struct {
+ Topic *string `protobuf:"bytes,1,req,name=topic" json:"topic,omitempty"`
+ RequestId *uint64 `protobuf:"varint,2,req,name=request_id,json=requestId" json:"request_id,omitempty"`
+ Authoritative *bool `protobuf:"varint,3,opt,name=authoritative,def=0" json:"authoritative,omitempty"`
+ // TODO - Remove original_principal, original_auth_data, original_auth_method
+ // Original principal that was verified by
+ // a Pulsar proxy.
+ OriginalPrincipal *string `protobuf:"bytes,4,opt,name=original_principal,json=originalPrincipal" json:"original_principal,omitempty"`
+ // Original auth role and auth Method that was passed
+ // to the proxy.
+ OriginalAuthData *string `protobuf:"bytes,5,opt,name=original_auth_data,json=originalAuthData" json:"original_auth_data,omitempty"`
+ OriginalAuthMethod *string `protobuf:"bytes,6,opt,name=original_auth_method,json=originalAuthMethod" json:"original_auth_method,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *CommandLookupTopic) Reset() { *m = CommandLookupTopic{} }
+func (m *CommandLookupTopic) String() string { return proto.CompactTextString(m) }
+func (*CommandLookupTopic) ProtoMessage() {}
+func (*CommandLookupTopic) Descriptor() ([]byte, []int) {
+ return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{12}
+}
+func (m *CommandLookupTopic) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_CommandLookupTopic.Unmarshal(m, b)
+}
+func (m *CommandLookupTopic) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_CommandLookupTopic.Marshal(b, m, deterministic)
+}
+func (dst *CommandLookupTopic) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_CommandLookupTopic.Merge(dst, src)
+}
+func (m *CommandLookupTopic) XXX_Size() int {
+ return xxx_messageInfo_CommandLookupTopic.Size(m)
+}
+func (m *CommandLookupTopic) XXX_DiscardUnknown() {
+ xxx_messageInfo_CommandLookupTopic.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_CommandLookupTopic proto.InternalMessageInfo
+
+const Default_CommandLookupTopic_Authoritative bool = false
+
+func (m *CommandLookupTopic) GetTopic() string {
+ if m != nil && m.Topic != nil {
+ return *m.Topic
+ }
+ return ""
+}
+
+func (m *CommandLookupTopic) GetRequestId() uint64 {
+ if m != nil && m.RequestId != nil {
+ return *m.RequestId
+ }
+ return 0
+}
+
+func (m *CommandLookupTopic) GetAuthoritative() bool {
+ if m != nil && m.Authoritative != nil {
+ return *m.Authoritative
+ }
+ return Default_CommandLookupTopic_Authoritative
+}
+
+func (m *CommandLookupTopic) GetOriginalPrincipal() string {
+ if m != nil && m.OriginalPrincipal != nil {
+ return *m.OriginalPrincipal
+ }
+ return ""
+}
+
+func (m *CommandLookupTopic) GetOriginalAuthData() string {
+ if m != nil && m.OriginalAuthData != nil {
+ return *m.OriginalAuthData
+ }
+ return ""
+}
+
+func (m *CommandLookupTopic) GetOriginalAuthMethod() string {
+ if m != nil && m.OriginalAuthMethod != nil {
+ return *m.OriginalAuthMethod
+ }
+ return ""
+}
+
+type CommandLookupTopicResponse struct {
+ BrokerServiceUrl *string `protobuf:"bytes,1,opt,name=brokerServiceUrl" json:"brokerServiceUrl,omitempty"`
+ BrokerServiceUrlTls *string `protobuf:"bytes,2,opt,name=brokerServiceUrlTls" json:"brokerServiceUrlTls,omitempty"`
+ Response *CommandLookupTopicResponse_LookupType `protobuf:"varint,3,opt,name=response,enum=pulsar.proto.CommandLookupTopicResponse_LookupType" json:"response,omitempty"`
+ RequestId *uint64 `protobuf:"varint,4,req,name=request_id,json=requestId" json:"request_id,omitempty"`
+ Authoritative *bool `protobuf:"varint,5,opt,name=authoritative,def=0" json:"authoritative,omitempty"`
+ Error *ServerError `protobuf:"varint,6,opt,name=error,enum=pulsar.proto.ServerError" json:"error,omitempty"`
+ Message *string `protobuf:"bytes,7,opt,name=message" json:"message,omitempty"`
+ // If it's true, indicates to the client that it must
+ // always connect through the service url after the
+ // lookup has been completed.
+ ProxyThroughServiceUrl *bool `protobuf:"varint,8,opt,name=proxy_through_service_url,json=proxyThroughServiceUrl,def=0" json:"proxy_through_service_url,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *CommandLookupTopicResponse) Reset() { *m = CommandLookupTopicResponse{} }
+func (m *CommandLookupTopicResponse) String() string { return proto.CompactTextString(m) }
+func (*CommandLookupTopicResponse) ProtoMessage() {}
+func (*CommandLookupTopicResponse) Descriptor() ([]byte, []int) {
+ return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{13}
+}
+func (m *CommandLookupTopicResponse) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_CommandLookupTopicResponse.Unmarshal(m, b)
+}
+func (m *CommandLookupTopicResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_CommandLookupTopicResponse.Marshal(b, m, deterministic)
+}
+func (dst *CommandLookupTopicResponse) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_CommandLookupTopicResponse.Merge(dst, src)
+}
+func (m *CommandLookupTopicResponse) XXX_Size() int {
+ return xxx_messageInfo_CommandLookupTopicResponse.Size(m)
+}
+func (m *CommandLookupTopicResponse) XXX_DiscardUnknown() {
+ xxx_messageInfo_CommandLookupTopicResponse.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_CommandLookupTopicResponse proto.InternalMessageInfo
+
+const Default_CommandLookupTopicResponse_Authoritative bool = false
+const Default_CommandLookupTopicResponse_ProxyThroughServiceUrl bool = false
+
+func (m *CommandLookupTopicResponse) GetBrokerServiceUrl() string {
+ if m != nil && m.BrokerServiceUrl != nil {
+ return *m.BrokerServiceUrl
+ }
+ return ""
+}
+
+func (m *CommandLookupTopicResponse) GetBrokerServiceUrlTls() string {
+ if m != nil && m.BrokerServiceUrlTls != nil {
+ return *m.BrokerServiceUrlTls
+ }
+ return ""
+}
+
+func (m *CommandLookupTopicResponse) GetResponse() CommandLookupTopicResponse_LookupType {
+ if m != nil && m.Response != nil {
+ return *m.Response
+ }
+ return CommandLookupTopicResponse_Redirect
+}
+
+func (m *CommandLookupTopicResponse) GetRequestId() uint64 {
+ if m != nil && m.RequestId != nil {
+ return *m.RequestId
+ }
+ return 0
+}
+
+func (m *CommandLookupTopicResponse) GetAuthoritative() bool {
+ if m != nil && m.Authoritative != nil {
+ return *m.Authoritative
+ }
+ return Default_CommandLookupTopicResponse_Authoritative
+}
+
+func (m *CommandLookupTopicResponse) GetError() ServerError {
+ if m != nil && m.Error != nil {
+ return *m.Error
+ }
+ return ServerError_UnknownError
+}
+
+func (m *CommandLookupTopicResponse) GetMessage() string {
+ if m != nil && m.Message != nil {
+ return *m.Message
+ }
+ return ""
+}
+
+func (m *CommandLookupTopicResponse) GetProxyThroughServiceUrl() bool {
+ if m != nil && m.ProxyThroughServiceUrl != nil {
+ return *m.ProxyThroughServiceUrl
+ }
+ return Default_CommandLookupTopicResponse_ProxyThroughServiceUrl
+}
+
+// / Create a new Producer on a topic, assigning the given producer_id,
+// / all messages sent with this producer_id will be persisted on the topic
+type CommandProducer struct {
+ Topic *string `protobuf:"bytes,1,req,name=topic" json:"topic,omitempty"`
+ ProducerId *uint64 `protobuf:"varint,2,req,name=producer_id,json=producerId" json:"producer_id,omitempty"`
+ RequestId *uint64 `protobuf:"varint,3,req,name=request_id,json=requestId" json:"request_id,omitempty"`
+ // / If a producer name is specified, the name will be used,
+ // / otherwise the broker will generate a unique name
+ ProducerName *string `protobuf:"bytes,4,opt,name=producer_name,json=producerName" json:"producer_name,omitempty"`
+ Encrypted *bool `protobuf:"varint,5,opt,name=encrypted,def=0" json:"encrypted,omitempty"`
+ // / Add optional metadata key=value to this producer
+ Metadata []*KeyValue `protobuf:"bytes,6,rep,name=metadata" json:"metadata,omitempty"`
+ Schema *Schema `protobuf:"bytes,7,opt,name=schema" json:"schema,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *CommandProducer) Reset() { *m = CommandProducer{} }
+func (m *CommandProducer) String() string { return proto.CompactTextString(m) }
+func (*CommandProducer) ProtoMessage() {}
+func (*CommandProducer) Descriptor() ([]byte, []int) {
+ return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{14}
+}
+func (m *CommandProducer) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_CommandProducer.Unmarshal(m, b)
+}
+func (m *CommandProducer) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_CommandProducer.Marshal(b, m, deterministic)
+}
+func (dst *CommandProducer) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_CommandProducer.Merge(dst, src)
+}
+func (m *CommandProducer) XXX_Size() int {
+ return xxx_messageInfo_CommandProducer.Size(m)
+}
+func (m *CommandProducer) XXX_DiscardUnknown() {
+ xxx_messageInfo_CommandProducer.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_CommandProducer proto.InternalMessageInfo
+
+const Default_CommandProducer_Encrypted bool = false
+
+func (m *CommandProducer) GetTopic() string {
+ if m != nil && m.Topic != nil {
+ return *m.Topic
+ }
+ return ""
+}
+
+func (m *CommandProducer) GetProducerId() uint64 {
+ if m != nil && m.ProducerId != nil {
+ return *m.ProducerId
+ }
+ return 0
+}
+
+func (m *CommandProducer) GetRequestId() uint64 {
+ if m != nil && m.RequestId != nil {
+ return *m.RequestId
+ }
+ return 0
+}
+
+func (m *CommandProducer) GetProducerName() string {
+ if m != nil && m.ProducerName != nil {
+ return *m.ProducerName
+ }
+ return ""
+}
+
+func (m *CommandProducer) GetEncrypted() bool {
+ if m != nil && m.Encrypted != nil {
+ return *m.Encrypted
+ }
+ return Default_CommandProducer_Encrypted
+}
+
+func (m *CommandProducer) GetMetadata() []*KeyValue {
+ if m != nil {
+ return m.Metadata
+ }
+ return nil
+}
+
+func (m *CommandProducer) GetSchema() *Schema {
+ if m != nil {
+ return m.Schema
+ }
+ return nil
+}
+
+type CommandSend struct {
+ ProducerId *uint64 `protobuf:"varint,1,req,name=producer_id,json=producerId" json:"producer_id,omitempty"`
+ SequenceId *uint64 `protobuf:"varint,2,req,name=sequence_id,json=sequenceId" json:"sequence_id,omitempty"`
+ NumMessages *int32 `protobuf:"varint,3,opt,name=num_messages,json=numMessages,def=1" json:"num_messages,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *CommandSend) Reset() { *m = CommandSend{} }
+func (m *CommandSend) String() string { return proto.CompactTextString(m) }
+func (*CommandSend) ProtoMessage() {}
+func (*CommandSend) Descriptor() ([]byte, []int) {
+ return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{15}
+}
+func (m *CommandSend) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_CommandSend.Unmarshal(m, b)
+}
+func (m *CommandSend) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_CommandSend.Marshal(b, m, deterministic)
+}
+func (dst *CommandSend) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_CommandSend.Merge(dst, src)
+}
+func (m *CommandSend) XXX_Size() int {
+ return xxx_messageInfo_CommandSend.Size(m)
+}
+func (m *CommandSend) XXX_DiscardUnknown() {
+ xxx_messageInfo_CommandSend.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_CommandSend proto.InternalMessageInfo
+
+const Default_CommandSend_NumMessages int32 = 1
+
+func (m *CommandSend) GetProducerId() uint64 {
+ if m != nil && m.ProducerId != nil {
+ return *m.ProducerId
+ }
+ return 0
+}
+
+func (m *CommandSend) GetSequenceId() uint64 {
+ if m != nil && m.SequenceId != nil {
+ return *m.SequenceId
+ }
+ return 0
+}
+
+func (m *CommandSend) GetNumMessages() int32 {
+ if m != nil && m.NumMessages != nil {
+ return *m.NumMessages
+ }
+ return Default_CommandSend_NumMessages
+}
+
+type CommandSendReceipt struct {
+ ProducerId *uint64 `protobuf:"varint,1,req,name=producer_id,json=producerId" json:"producer_id,omitempty"`
+ SequenceId *uint64 `protobuf:"varint,2,req,name=sequence_id,json=sequenceId" json:"sequence_id,omitempty"`
+ MessageId *MessageIdData `protobuf:"bytes,3,opt,name=message_id,json=messageId" json:"message_id,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *CommandSendReceipt) Reset() { *m = CommandSendReceipt{} }
+func (m *CommandSendReceipt) String() string { return proto.CompactTextString(m) }
+func (*CommandSendReceipt) ProtoMessage() {}
+func (*CommandSendReceipt) Descriptor() ([]byte, []int) {
+ return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{16}
+}
+func (m *CommandSendReceipt) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_CommandSendReceipt.Unmarshal(m, b)
+}
+func (m *CommandSendReceipt) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_CommandSendReceipt.Marshal(b, m, deterministic)
+}
+func (dst *CommandSendReceipt) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_CommandSendReceipt.Merge(dst, src)
+}
+func (m *CommandSendReceipt) XXX_Size() int {
+ return xxx_messageInfo_CommandSendReceipt.Size(m)
+}
+func (m *CommandSendReceipt) XXX_DiscardUnknown() {
+ xxx_messageInfo_CommandSendReceipt.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_CommandSendReceipt proto.InternalMessageInfo
+
+func (m *CommandSendReceipt) GetProducerId() uint64 {
+ if m != nil && m.ProducerId != nil {
+ return *m.ProducerId
+ }
+ return 0
+}
+
+func (m *CommandSendReceipt) GetSequenceId() uint64 {
+ if m != nil && m.SequenceId != nil {
+ return *m.SequenceId
+ }
+ return 0
+}
+
+func (m *CommandSendReceipt) GetMessageId() *MessageIdData {
+ if m != nil {
+ return m.MessageId
+ }
+ return nil
+}
+
+type CommandSendError struct {
+ ProducerId *uint64 `protobuf:"varint,1,req,name=producer_id,json=producerId" json:"producer_id,omitempty"`
+ SequenceId *uint64 `protobuf:"varint,2,req,name=sequence_id,json=sequenceId" json:"sequence_id,omitempty"`
+ Error *ServerError `protobuf:"varint,3,req,name=error,enum=pulsar.proto.ServerError" json:"error,omitempty"`
+ Message *string `protobuf:"bytes,4,req,name=message" json:"message,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *CommandSendError) Reset() { *m = CommandSendError{} }
+func (m *CommandSendError) String() string { return proto.CompactTextString(m) }
+func (*CommandSendError) ProtoMessage() {}
+func (*CommandSendError) Descriptor() ([]byte, []int) {
+ return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{17}
+}
+func (m *CommandSendError) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_CommandSendError.Unmarshal(m, b)
+}
+func (m *CommandSendError) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_CommandSendError.Marshal(b, m, deterministic)
+}
+func (dst *CommandSendError) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_CommandSendError.Merge(dst, src)
+}
+func (m *CommandSendError) XXX_Size() int {
+ return xxx_messageInfo_CommandSendError.Size(m)
+}
+func (m *CommandSendError) XXX_DiscardUnknown() {
+ xxx_messageInfo_CommandSendError.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_CommandSendError proto.InternalMessageInfo
+
+func (m *CommandSendError) GetProducerId() uint64 {
+ if m != nil && m.ProducerId != nil {
+ return *m.ProducerId
+ }
+ return 0
+}
+
+func (m *CommandSendError) GetSequenceId() uint64 {
+ if m != nil && m.SequenceId != nil {
+ return *m.SequenceId
+ }
+ return 0
+}
+
+func (m *CommandSendError) GetError() ServerError {
+ if m != nil && m.Error != nil {
+ return *m.Error
+ }
+ return ServerError_UnknownError
+}
+
+func (m *CommandSendError) GetMessage() string {
+ if m != nil && m.Message != nil {
+ return *m.Message
+ }
+ return ""
+}
+
+type CommandMessage struct {
+ ConsumerId *uint64 `protobuf:"varint,1,req,name=consumer_id,json=consumerId" json:"consumer_id,omitempty"`
+ MessageId *MessageIdData `protobuf:"bytes,2,req,name=message_id,json=messageId" json:"message_id,omitempty"`
+ RedeliveryCount *uint32 `protobuf:"varint,3,opt,name=redelivery_count,json=redeliveryCount,def=0" json:"redelivery_count,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *CommandMessage) Reset() { *m = CommandMessage{} }
+func (m *CommandMessage) String() string { return proto.CompactTextString(m) }
+func (*CommandMessage) ProtoMessage() {}
+func (*CommandMessage) Descriptor() ([]byte, []int) {
+ return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{18}
+}
+func (m *CommandMessage) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_CommandMessage.Unmarshal(m, b)
+}
+func (m *CommandMessage) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_CommandMessage.Marshal(b, m, deterministic)
+}
+func (dst *CommandMessage) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_CommandMessage.Merge(dst, src)
+}
+func (m *CommandMessage) XXX_Size() int {
+ return xxx_messageInfo_CommandMessage.Size(m)
+}
+func (m *CommandMessage) XXX_DiscardUnknown() {
+ xxx_messageInfo_CommandMessage.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_CommandMessage proto.InternalMessageInfo
+
+const Default_CommandMessage_RedeliveryCount uint32 = 0
+
+func (m *CommandMessage) GetConsumerId() uint64 {
+ if m != nil && m.ConsumerId != nil {
+ return *m.ConsumerId
+ }
+ return 0
+}
+
+func (m *CommandMessage) GetMessageId() *MessageIdData {
+ if m != nil {
+ return m.MessageId
+ }
+ return nil
+}
+
+func (m *CommandMessage) GetRedeliveryCount() uint32 {
+ if m != nil && m.RedeliveryCount != nil {
+ return *m.RedeliveryCount
+ }
+ return Default_CommandMessage_RedeliveryCount
+}
+
+type CommandAck struct {
+ ConsumerId *uint64 `protobuf:"varint,1,req,name=consumer_id,json=consumerId" json:"consumer_id,omitempty"`
+ AckType *CommandAck_AckType `protobuf:"varint,2,req,name=ack_type,json=ackType,enum=pulsar.proto.CommandAck_AckType" json:"ack_type,omitempty"`
+ // In case of individual acks, the client can pass a list of message ids
+ MessageId []*MessageIdData `protobuf:"bytes,3,rep,name=message_id,json=messageId" json:"message_id,omitempty"`
+ ValidationError *CommandAck_ValidationError `protobuf:"varint,4,opt,name=validation_error,json=validationError,enum=pulsar.proto.CommandAck_ValidationError" json:"validation_error,omitempty"`
+ Properties []*KeyLongValue `protobuf:"bytes,5,rep,name=properties" json:"properties,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *CommandAck) Reset() { *m = CommandAck{} }
+func (m *CommandAck) String() string { return proto.CompactTextString(m) }
+func (*CommandAck) ProtoMessage() {}
+func (*CommandAck) Descriptor() ([]byte, []int) {
+ return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{19}
+}
+func (m *CommandAck) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_CommandAck.Unmarshal(m, b)
+}
+func (m *CommandAck) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_CommandAck.Marshal(b, m, deterministic)
+}
+func (dst *CommandAck) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_CommandAck.Merge(dst, src)
+}
+func (m *CommandAck) XXX_Size() int {
+ return xxx_messageInfo_CommandAck.Size(m)
+}
+func (m *CommandAck) XXX_DiscardUnknown() {
+ xxx_messageInfo_CommandAck.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_CommandAck proto.InternalMessageInfo
+
+func (m *CommandAck) GetConsumerId() uint64 {
+ if m != nil && m.ConsumerId != nil {
+ return *m.ConsumerId
+ }
+ return 0
+}
+
+func (m *CommandAck) GetAckType() CommandAck_AckType {
+ if m != nil && m.AckType != nil {
+ return *m.AckType
+ }
+ return CommandAck_Individual
+}
+
+func (m *CommandAck) GetMessageId() []*MessageIdData {
+ if m != nil {
+ return m.MessageId
+ }
+ return nil
+}
+
+func (m *CommandAck) GetValidationError() CommandAck_ValidationError {
+ if m != nil && m.ValidationError != nil {
+ return *m.ValidationError
+ }
+ return CommandAck_UncompressedSizeCorruption
+}
+
+func (m *CommandAck) GetProperties() []*KeyLongValue {
+ if m != nil {
+ return m.Properties
+ }
+ return nil
+}
+
+// changes on active consumer
+type CommandActiveConsumerChange struct {
+ ConsumerId *uint64 `protobuf:"varint,1,req,name=consumer_id,json=consumerId" json:"consumer_id,omitempty"`
+ IsActive *bool `protobuf:"varint,2,opt,name=is_active,json=isActive,def=0" json:"is_active,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *CommandActiveConsumerChange) Reset() { *m = CommandActiveConsumerChange{} }
+func (m *CommandActiveConsumerChange) String() string { return proto.CompactTextString(m) }
+func (*CommandActiveConsumerChange) ProtoMessage() {}
+func (*CommandActiveConsumerChange) Descriptor() ([]byte, []int) {
+ return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{20}
+}
+func (m *CommandActiveConsumerChange) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_CommandActiveConsumerChange.Unmarshal(m, b)
+}
+func (m *CommandActiveConsumerChange) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_CommandActiveConsumerChange.Marshal(b, m, deterministic)
+}
+func (dst *CommandActiveConsumerChange) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_CommandActiveConsumerChange.Merge(dst, src)
+}
+func (m *CommandActiveConsumerChange) XXX_Size() int {
+ return xxx_messageInfo_CommandActiveConsumerChange.Size(m)
+}
+func (m *CommandActiveConsumerChange) XXX_DiscardUnknown() {
+ xxx_messageInfo_CommandActiveConsumerChange.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_CommandActiveConsumerChange proto.InternalMessageInfo
+
+const Default_CommandActiveConsumerChange_IsActive bool = false
+
+func (m *CommandActiveConsumerChange) GetConsumerId() uint64 {
+ if m != nil && m.ConsumerId != nil {
+ return *m.ConsumerId
+ }
+ return 0
+}
+
+func (m *CommandActiveConsumerChange) GetIsActive() bool {
+ if m != nil && m.IsActive != nil {
+ return *m.IsActive
+ }
+ return Default_CommandActiveConsumerChange_IsActive
+}
+
+type CommandFlow struct {
+ ConsumerId *uint64 `protobuf:"varint,1,req,name=consumer_id,json=consumerId" json:"consumer_id,omitempty"`
+ // Max number of messages to prefetch, in addition
+ // of any number previously specified
+ MessagePermits *uint32 `protobuf:"varint,2,req,name=messagePermits" json:"messagePermits,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *CommandFlow) Reset() { *m = CommandFlow{} }
+func (m *CommandFlow) String() string { return proto.CompactTextString(m) }
+func (*CommandFlow) ProtoMessage() {}
+func (*CommandFlow) Descriptor() ([]byte, []int) {
+ return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{21}
+}
+func (m *CommandFlow) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_CommandFlow.Unmarshal(m, b)
+}
+func (m *CommandFlow) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_CommandFlow.Marshal(b, m, deterministic)
+}
+func (dst *CommandFlow) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_CommandFlow.Merge(dst, src)
+}
+func (m *CommandFlow) XXX_Size() int {
+ return xxx_messageInfo_CommandFlow.Size(m)
+}
+func (m *CommandFlow) XXX_DiscardUnknown() {
+ xxx_messageInfo_CommandFlow.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_CommandFlow proto.InternalMessageInfo
+
+func (m *CommandFlow) GetConsumerId() uint64 {
+ if m != nil && m.ConsumerId != nil {
+ return *m.ConsumerId
+ }
+ return 0
+}
+
+func (m *CommandFlow) GetMessagePermits() uint32 {
+ if m != nil && m.MessagePermits != nil {
+ return *m.MessagePermits
+ }
+ return 0
+}
+
+type CommandUnsubscribe struct {
+ ConsumerId *uint64 `protobuf:"varint,1,req,name=consumer_id,json=consumerId" json:"consumer_id,omitempty"`
+ RequestId *uint64 `protobuf:"varint,2,req,name=request_id,json=requestId" json:"request_id,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *CommandUnsubscribe) Reset() { *m = CommandUnsubscribe{} }
+func (m *CommandUnsubscribe) String() string { return proto.CompactTextString(m) }
+func (*CommandUnsubscribe) ProtoMessage() {}
+func (*CommandUnsubscribe) Descriptor() ([]byte, []int) {
+ return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{22}
+}
+func (m *CommandUnsubscribe) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_CommandUnsubscribe.Unmarshal(m, b)
+}
+func (m *CommandUnsubscribe) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_CommandUnsubscribe.Marshal(b, m, deterministic)
+}
+func (dst *CommandUnsubscribe) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_CommandUnsubscribe.Merge(dst, src)
+}
+func (m *CommandUnsubscribe) XXX_Size() int {
+ return xxx_messageInfo_CommandUnsubscribe.Size(m)
+}
+func (m *CommandUnsubscribe) XXX_DiscardUnknown() {
+ xxx_messageInfo_CommandUnsubscribe.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_CommandUnsubscribe proto.InternalMessageInfo
+
+func (m *CommandUnsubscribe) GetConsumerId() uint64 {
+ if m != nil && m.ConsumerId != nil {
+ return *m.ConsumerId
+ }
+ return 0
+}
+
+func (m *CommandUnsubscribe) GetRequestId() uint64 {
+ if m != nil && m.RequestId != nil {
+ return *m.RequestId
+ }
+ return 0
+}
+
+// Reset an existing consumer to a particular message id
+type CommandSeek struct {
+ ConsumerId *uint64 `protobuf:"varint,1,req,name=consumer_id,json=consumerId" json:"consumer_id,omitempty"`
+ RequestId *uint64 `protobuf:"varint,2,req,name=request_id,json=requestId" json:"request_id,omitempty"`
+ MessageId *MessageIdData `protobuf:"bytes,3,opt,name=message_id,json=messageId" json:"message_id,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *CommandSeek) Reset() { *m = CommandSeek{} }
+func (m *CommandSeek) String() string { return proto.CompactTextString(m) }
+func (*CommandSeek) ProtoMessage() {}
+func (*CommandSeek) Descriptor() ([]byte, []int) {
+ return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{23}
+}
+func (m *CommandSeek) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_CommandSeek.Unmarshal(m, b)
+}
+func (m *CommandSeek) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_CommandSeek.Marshal(b, m, deterministic)
+}
+func (dst *CommandSeek) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_CommandSeek.Merge(dst, src)
+}
+func (m *CommandSeek) XXX_Size() int {
+ return xxx_messageInfo_CommandSeek.Size(m)
+}
+func (m *CommandSeek) XXX_DiscardUnknown() {
+ xxx_messageInfo_CommandSeek.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_CommandSeek proto.InternalMessageInfo
+
+func (m *CommandSeek) GetConsumerId() uint64 {
+ if m != nil && m.ConsumerId != nil {
+ return *m.ConsumerId
+ }
+ return 0
+}
+
+func (m *CommandSeek) GetRequestId() uint64 {
+ if m != nil && m.RequestId != nil {
+ return *m.RequestId
+ }
+ return 0
+}
+
+func (m *CommandSeek) GetMessageId() *MessageIdData {
+ if m != nil {
+ return m.MessageId
+ }
+ return nil
+}
+
+// Message sent by broker to client when a topic
+// has been forcefully terminated and there are no more
+// messages left to consume
+type CommandReachedEndOfTopic struct {
+ ConsumerId *uint64 `protobuf:"varint,1,req,name=consumer_id,json=consumerId" json:"consumer_id,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *CommandReachedEndOfTopic) Reset() { *m = CommandReachedEndOfTopic{} }
+func (m *CommandReachedEndOfTopic) String() string { return proto.CompactTextString(m) }
+func (*CommandReachedEndOfTopic) ProtoMessage() {}
+func (*CommandReachedEndOfTopic) Descriptor() ([]byte, []int) {
+ return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{24}
+}
+func (m *CommandReachedEndOfTopic) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_CommandReachedEndOfTopic.Unmarshal(m, b)
+}
+func (m *CommandReachedEndOfTopic) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_CommandReachedEndOfTopic.Marshal(b, m, deterministic)
+}
+func (dst *CommandReachedEndOfTopic) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_CommandReachedEndOfTopic.Merge(dst, src)
+}
+func (m *CommandReachedEndOfTopic) XXX_Size() int {
+ return xxx_messageInfo_CommandReachedEndOfTopic.Size(m)
+}
+func (m *CommandReachedEndOfTopic) XXX_DiscardUnknown() {
+ xxx_messageInfo_CommandReachedEndOfTopic.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_CommandReachedEndOfTopic proto.InternalMessageInfo
+
+func (m *CommandReachedEndOfTopic) GetConsumerId() uint64 {
+ if m != nil && m.ConsumerId != nil {
+ return *m.ConsumerId
+ }
+ return 0
+}
+
+type CommandCloseProducer struct {
+ ProducerId *uint64 `protobuf:"varint,1,req,name=producer_id,json=producerId" json:"producer_id,omitempty"`
+ RequestId *uint64 `protobuf:"varint,2,req,name=request_id,json=requestId" json:"request_id,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *CommandCloseProducer) Reset() { *m = CommandCloseProducer{} }
+func (m *CommandCloseProducer) String() string { return proto.CompactTextString(m) }
+func (*CommandCloseProducer) ProtoMessage() {}
+func (*CommandCloseProducer) Descriptor() ([]byte, []int) {
+ return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{25}
+}
+func (m *CommandCloseProducer) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_CommandCloseProducer.Unmarshal(m, b)
+}
+func (m *CommandCloseProducer) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_CommandCloseProducer.Marshal(b, m, deterministic)
+}
+func (dst *CommandCloseProducer) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_CommandCloseProducer.Merge(dst, src)
+}
+func (m *CommandCloseProducer) XXX_Size() int {
+ return xxx_messageInfo_CommandCloseProducer.Size(m)
+}
+func (m *CommandCloseProducer) XXX_DiscardUnknown() {
+ xxx_messageInfo_CommandCloseProducer.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_CommandCloseProducer proto.InternalMessageInfo
+
+func (m *CommandCloseProducer) GetProducerId() uint64 {
+ if m != nil && m.ProducerId != nil {
+ return *m.ProducerId
+ }
+ return 0
+}
+
+func (m *CommandCloseProducer) GetRequestId() uint64 {
+ if m != nil && m.RequestId != nil {
+ return *m.RequestId
+ }
+ return 0
+}
+
+type CommandCloseConsumer struct {
+ ConsumerId *uint64 `protobuf:"varint,1,req,name=consumer_id,json=consumerId" json:"consumer_id,omitempty"`
+ RequestId *uint64 `protobuf:"varint,2,req,name=request_id,json=requestId" json:"request_id,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *CommandCloseConsumer) Reset() { *m = CommandCloseConsumer{} }
+func (m *CommandCloseConsumer) String() string { return proto.CompactTextString(m) }
+func (*CommandCloseConsumer) ProtoMessage() {}
+func (*CommandCloseConsumer) Descriptor() ([]byte, []int) {
+ return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{26}
+}
+func (m *CommandCloseConsumer) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_CommandCloseConsumer.Unmarshal(m, b)
+}
+func (m *CommandCloseConsumer) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_CommandCloseConsumer.Marshal(b, m, deterministic)
+}
+func (dst *CommandCloseConsumer) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_CommandCloseConsumer.Merge(dst, src)
+}
+func (m *CommandCloseConsumer) XXX_Size() int {
+ return xxx_messageInfo_CommandCloseConsumer.Size(m)
+}
+func (m *CommandCloseConsumer) XXX_DiscardUnknown() {
+ xxx_messageInfo_CommandCloseConsumer.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_CommandCloseConsumer proto.InternalMessageInfo
+
+func (m *CommandCloseConsumer) GetConsumerId() uint64 {
+ if m != nil && m.ConsumerId != nil {
+ return *m.ConsumerId
+ }
+ return 0
+}
+
+func (m *CommandCloseConsumer) GetRequestId() uint64 {
+ if m != nil && m.RequestId != nil {
+ return *m.RequestId
+ }
+ return 0
+}
+
+type CommandRedeliverUnacknowledgedMessages struct {
+ ConsumerId *uint64 `protobuf:"varint,1,req,name=consumer_id,json=consumerId" json:"consumer_id,omitempty"`
+ MessageIds []*MessageIdData `protobuf:"bytes,2,rep,name=message_ids,json=messageIds" json:"message_ids,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *CommandRedeliverUnacknowledgedMessages) Reset() {
+ *m = CommandRedeliverUnacknowledgedMessages{}
+}
+func (m *CommandRedeliverUnacknowledgedMessages) String() string { return proto.CompactTextString(m) }
+func (*CommandRedeliverUnacknowledgedMessages) ProtoMessage() {}
+func (*CommandRedeliverUnacknowledgedMessages) Descriptor() ([]byte, []int) {
+ return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{27}
+}
+func (m *CommandRedeliverUnacknowledgedMessages) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_CommandRedeliverUnacknowledgedMessages.Unmarshal(m, b)
+}
+func (m *CommandRedeliverUnacknowledgedMessages) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_CommandRedeliverUnacknowledgedMessages.Marshal(b, m, deterministic)
+}
+func (dst *CommandRedeliverUnacknowledgedMessages) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_CommandRedeliverUnacknowledgedMessages.Merge(dst, src)
+}
+func (m *CommandRedeliverUnacknowledgedMessages) XXX_Size() int {
+ return xxx_messageInfo_CommandRedeliverUnacknowledgedMessages.Size(m)
+}
+func (m *CommandRedeliverUnacknowledgedMessages) XXX_DiscardUnknown() {
+ xxx_messageInfo_CommandRedeliverUnacknowledgedMessages.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_CommandRedeliverUnacknowledgedMessages proto.InternalMessageInfo
+
+func (m *CommandRedeliverUnacknowledgedMessages) GetConsumerId() uint64 {
+ if m != nil && m.ConsumerId != nil {
+ return *m.ConsumerId
+ }
+ return 0
+}
+
+func (m *CommandRedeliverUnacknowledgedMessages) GetMessageIds() []*MessageIdData {
+ if m != nil {
+ return m.MessageIds
+ }
+ return nil
+}
+
+type CommandSuccess struct {
+ RequestId *uint64 `protobuf:"varint,1,req,name=request_id,json=requestId" json:"request_id,omitempty"`
+ Schema *Schema `protobuf:"bytes,2,opt,name=schema" json:"schema,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *CommandSuccess) Reset() { *m = CommandSuccess{} }
+func (m *CommandSuccess) String() string { return proto.CompactTextString(m) }
+func (*CommandSuccess) ProtoMessage() {}
+func (*CommandSuccess) Descriptor() ([]byte, []int) {
+ return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{28}
+}
+func (m *CommandSuccess) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_CommandSuccess.Unmarshal(m, b)
+}
+func (m *CommandSuccess) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_CommandSuccess.Marshal(b, m, deterministic)
+}
+func (dst *CommandSuccess) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_CommandSuccess.Merge(dst, src)
+}
+func (m *CommandSuccess) XXX_Size() int {
+ return xxx_messageInfo_CommandSuccess.Size(m)
+}
+func (m *CommandSuccess) XXX_DiscardUnknown() {
+ xxx_messageInfo_CommandSuccess.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_CommandSuccess proto.InternalMessageInfo
+
+func (m *CommandSuccess) GetRequestId() uint64 {
+ if m != nil && m.RequestId != nil {
+ return *m.RequestId
+ }
+ return 0
+}
+
+func (m *CommandSuccess) GetSchema() *Schema {
+ if m != nil {
+ return m.Schema
+ }
+ return nil
+}
+
+// / Response from CommandProducer
+type CommandProducerSuccess struct {
+ RequestId *uint64 `protobuf:"varint,1,req,name=request_id,json=requestId" json:"request_id,omitempty"`
+ ProducerName *string `protobuf:"bytes,2,req,name=producer_name,json=producerName" json:"producer_name,omitempty"`
+ // The last sequence id that was stored by this producer in the previous session
+ // This will only be meaningful if deduplication has been enabled.
+ LastSequenceId *int64 `protobuf:"varint,3,opt,name=last_sequence_id,json=lastSequenceId,def=-1" json:"last_sequence_id,omitempty"`
+ SchemaVersion []byte `protobuf:"bytes,4,opt,name=schema_version,json=schemaVersion" json:"schema_version,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *CommandProducerSuccess) Reset() { *m = CommandProducerSuccess{} }
+func (m *CommandProducerSuccess) String() string { return proto.CompactTextString(m) }
+func (*CommandProducerSuccess) ProtoMessage() {}
+func (*CommandProducerSuccess) Descriptor() ([]byte, []int) {
+ return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{29}
+}
+func (m *CommandProducerSuccess) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_CommandProducerSuccess.Unmarshal(m, b)
+}
+func (m *CommandProducerSuccess) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_CommandProducerSuccess.Marshal(b, m, deterministic)
+}
+func (dst *CommandProducerSuccess) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_CommandProducerSuccess.Merge(dst, src)
+}
+func (m *CommandProducerSuccess) XXX_Size() int {
+ return xxx_messageInfo_CommandProducerSuccess.Size(m)
+}
+func (m *CommandProducerSuccess) XXX_DiscardUnknown() {
+ xxx_messageInfo_CommandProducerSuccess.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_CommandProducerSuccess proto.InternalMessageInfo
+
+const Default_CommandProducerSuccess_LastSequenceId int64 = -1
+
+func (m *CommandProducerSuccess) GetRequestId() uint64 {
+ if m != nil && m.RequestId != nil {
+ return *m.RequestId
+ }
+ return 0
+}
+
+func (m *CommandProducerSuccess) GetProducerName() string {
+ if m != nil && m.ProducerName != nil {
+ return *m.ProducerName
+ }
+ return ""
+}
+
+func (m *CommandProducerSuccess) GetLastSequenceId() int64 {
+ if m != nil && m.LastSequenceId != nil {
+ return *m.LastSequenceId
+ }
+ return Default_CommandProducerSuccess_LastSequenceId
+}
+
+func (m *CommandProducerSuccess) GetSchemaVersion() []byte {
+ if m != nil {
+ return m.SchemaVersion
+ }
+ return nil
+}
+
+type CommandError struct {
+ RequestId *uint64 `protobuf:"varint,1,req,name=request_id,json=requestId" json:"request_id,omitempty"`
+ Error *ServerError `protobuf:"varint,2,req,name=error,enum=pulsar.proto.ServerError" json:"error,omitempty"`
+ Message *string `protobuf:"bytes,3,req,name=message" json:"message,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *CommandError) Reset() { *m = CommandError{} }
+func (m *CommandError) String() string { return proto.CompactTextString(m) }
+func (*CommandError) ProtoMessage() {}
+func (*CommandError) Descriptor() ([]byte, []int) {
+ return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{30}
+}
+func (m *CommandError) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_CommandError.Unmarshal(m, b)
+}
+func (m *CommandError) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_CommandError.Marshal(b, m, deterministic)
+}
+func (dst *CommandError) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_CommandError.Merge(dst, src)
+}
+func (m *CommandError) XXX_Size() int {
+ return xxx_messageInfo_CommandError.Size(m)
+}
+func (m *CommandError) XXX_DiscardUnknown() {
+ xxx_messageInfo_CommandError.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_CommandError proto.InternalMessageInfo
+
+func (m *CommandError) GetRequestId() uint64 {
+ if m != nil && m.RequestId != nil {
+ return *m.RequestId
+ }
+ return 0
+}
+
+func (m *CommandError) GetError() ServerError {
+ if m != nil && m.Error != nil {
+ return *m.Error
+ }
+ return ServerError_UnknownError
+}
+
+func (m *CommandError) GetMessage() string {
+ if m != nil && m.Message != nil {
+ return *m.Message
+ }
+ return ""
+}
+
+// Commands to probe the state of connection.
+// When either client or broker doesn't receive commands for certain
+// amount of time, they will send a Ping probe.
+type CommandPing struct {
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *CommandPing) Reset() { *m = CommandPing{} }
+func (m *CommandPing) String() string { return proto.CompactTextString(m) }
+func (*CommandPing) ProtoMessage() {}
+func (*CommandPing) Descriptor() ([]byte, []int) {
+ return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{31}
+}
+func (m *CommandPing) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_CommandPing.Unmarshal(m, b)
+}
+func (m *CommandPing) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_CommandPing.Marshal(b, m, deterministic)
+}
+func (dst *CommandPing) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_CommandPing.Merge(dst, src)
+}
+func (m *CommandPing) XXX_Size() int {
+ return xxx_messageInfo_CommandPing.Size(m)
+}
+func (m *CommandPing) XXX_DiscardUnknown() {
+ xxx_messageInfo_CommandPing.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_CommandPing proto.InternalMessageInfo
+
+type CommandPong struct {
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *CommandPong) Reset() { *m = CommandPong{} }
+func (m *CommandPong) String() string { return proto.CompactTextString(m) }
+func (*CommandPong) ProtoMessage() {}
+func (*CommandPong) Descriptor() ([]byte, []int) {
+ return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{32}
+}
+func (m *CommandPong) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_CommandPong.Unmarshal(m, b)
+}
+func (m *CommandPong) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_CommandPong.Marshal(b, m, deterministic)
+}
+func (dst *CommandPong) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_CommandPong.Merge(dst, src)
+}
+func (m *CommandPong) XXX_Size() int {
+ return xxx_messageInfo_CommandPong.Size(m)
+}
+func (m *CommandPong) XXX_DiscardUnknown() {
+ xxx_messageInfo_CommandPong.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_CommandPong proto.InternalMessageInfo
+
+type CommandConsumerStats struct {
+ RequestId *uint64 `protobuf:"varint,1,req,name=request_id,json=requestId" json:"request_id,omitempty"`
+ // required string topic_name = 2;
+ // required string subscription_name = 3;
+ ConsumerId *uint64 `protobuf:"varint,4,req,name=consumer_id,json=consumerId" json:"consumer_id,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *CommandConsumerStats) Reset() { *m = CommandConsumerStats{} }
+func (m *CommandConsumerStats) String() string { return proto.CompactTextString(m) }
+func (*CommandConsumerStats) ProtoMessage() {}
+func (*CommandConsumerStats) Descriptor() ([]byte, []int) {
+ return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{33}
+}
+func (m *CommandConsumerStats) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_CommandConsumerStats.Unmarshal(m, b)
+}
+func (m *CommandConsumerStats) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_CommandConsumerStats.Marshal(b, m, deterministic)
+}
+func (dst *CommandConsumerStats) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_CommandConsumerStats.Merge(dst, src)
+}
+func (m *CommandConsumerStats) XXX_Size() int {
+ return xxx_messageInfo_CommandConsumerStats.Size(m)
+}
+func (m *CommandConsumerStats) XXX_DiscardUnknown() {
+ xxx_messageInfo_CommandConsumerStats.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_CommandConsumerStats proto.InternalMessageInfo
+
+func (m *CommandConsumerStats) GetRequestId() uint64 {
+ if m != nil && m.RequestId != nil {
+ return *m.RequestId
+ }
+ return 0
+}
+
+func (m *CommandConsumerStats) GetConsumerId() uint64 {
+ if m != nil && m.ConsumerId != nil {
+ return *m.ConsumerId
+ }
+ return 0
+}
+
+type CommandConsumerStatsResponse struct {
+ RequestId *uint64 `protobuf:"varint,1,req,name=request_id,json=requestId" json:"request_id,omitempty"`
+ ErrorCode *ServerError `protobuf:"varint,2,opt,name=error_code,json=errorCode,enum=pulsar.proto.ServerError" json:"error_code,omitempty"`
+ ErrorMessage *string `protobuf:"bytes,3,opt,name=error_message,json=errorMessage" json:"error_message,omitempty"`
+ // / Total rate of messages delivered to the consumer. msg/s
+ MsgRateOut *float64 `protobuf:"fixed64,4,opt,name=msgRateOut" json:"msgRateOut,omitempty"`
+ // / Total throughput delivered to the consumer. bytes/s
+ MsgThroughputOut *float64 `protobuf:"fixed64,5,opt,name=msgThroughputOut" json:"msgThroughputOut,omitempty"`
+ // / Total rate of messages redelivered by this consumer. msg/s
+ MsgRateRedeliver *float64 `protobuf:"fixed64,6,opt,name=msgRateRedeliver" json:"msgRateRedeliver,omitempty"`
+ // / Name of the consumer
+ ConsumerName *string `protobuf:"bytes,7,opt,name=consumerName" json:"consumerName,omitempty"`
+ // / Number of available message permits for the consumer
+ AvailablePermits *uint64 `protobuf:"varint,8,opt,name=availablePermits" json:"availablePermits,omitempty"`
+ // / Number of unacknowledged messages for the consumer
+ UnackedMessages *uint64 `protobuf:"varint,9,opt,name=unackedMessages" json:"unackedMessages,omitempty"`
+ // / Flag to verify if consumer is blocked due to reaching threshold of unacked messages
+ BlockedConsumerOnUnackedMsgs *bool `protobuf:"varint,10,opt,name=blockedConsumerOnUnackedMsgs" json:"blockedConsumerOnUnackedMsgs,omitempty"`
+ // / Address of this consumer
+ Address *string `protobuf:"bytes,11,opt,name=address" json:"address,omitempty"`
+ // / Timestamp of connection
+ ConnectedSince *string `protobuf:"bytes,12,opt,name=connectedSince" json:"connectedSince,omitempty"`
+ // / Whether this subscription is Exclusive or Shared or Failover
+ Type *string `protobuf:"bytes,13,opt,name=type" json:"type,omitempty"`
+ // / Total rate of messages expired on this subscription. msg/s
+ MsgRateExpired *float64 `protobuf:"fixed64,14,opt,name=msgRateExpired" json:"msgRateExpired,omitempty"`
+ // / Number of messages in the subscription backlog
+ MsgBacklog *uint64 `protobuf:"varint,15,opt,name=msgBacklog" json:"msgBacklog,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *CommandConsumerStatsResponse) Reset() { *m = CommandConsumerStatsResponse{} }
+func (m *CommandConsumerStatsResponse) String() string { return proto.CompactTextString(m) }
+func (*CommandConsumerStatsResponse) ProtoMessage() {}
+func (*CommandConsumerStatsResponse) Descriptor() ([]byte, []int) {
+ return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{34}
+}
+func (m *CommandConsumerStatsResponse) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_CommandConsumerStatsResponse.Unmarshal(m, b)
+}
+func (m *CommandConsumerStatsResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_CommandConsumerStatsResponse.Marshal(b, m, deterministic)
+}
+func (dst *CommandConsumerStatsResponse) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_CommandConsumerStatsResponse.Merge(dst, src)
+}
+func (m *CommandConsumerStatsResponse) XXX_Size() int {
+ return xxx_messageInfo_CommandConsumerStatsResponse.Size(m)
+}
+func (m *CommandConsumerStatsResponse) XXX_DiscardUnknown() {
+ xxx_messageInfo_CommandConsumerStatsResponse.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_CommandConsumerStatsResponse proto.InternalMessageInfo
+
+func (m *CommandConsumerStatsResponse) GetRequestId() uint64 {
+ if m != nil && m.RequestId != nil {
+ return *m.RequestId
+ }
+ return 0
+}
+
+func (m *CommandConsumerStatsResponse) GetErrorCode() ServerError {
+ if m != nil && m.ErrorCode != nil {
+ return *m.ErrorCode
+ }
+ return ServerError_UnknownError
+}
+
+func (m *CommandConsumerStatsResponse) GetErrorMessage() string {
+ if m != nil && m.ErrorMessage != nil {
+ return *m.ErrorMessage
+ }
+ return ""
+}
+
+func (m *CommandConsumerStatsResponse) GetMsgRateOut() float64 {
+ if m != nil && m.MsgRateOut != nil {
+ return *m.MsgRateOut
+ }
+ return 0
+}
+
+func (m *CommandConsumerStatsResponse) GetMsgThroughputOut() float64 {
+ if m != nil && m.MsgThroughputOut != nil {
+ return *m.MsgThroughputOut
+ }
+ return 0
+}
+
+func (m *CommandConsumerStatsResponse) GetMsgRateRedeliver() float64 {
+ if m != nil && m.MsgRateRedeliver != nil {
+ return *m.MsgRateRedeliver
+ }
+ return 0
+}
+
+func (m *CommandConsumerStatsResponse) GetConsumerName() string {
+ if m != nil && m.ConsumerName != nil {
+ return *m.ConsumerName
+ }
+ return ""
+}
+
+func (m *CommandConsumerStatsResponse) GetAvailablePermits() uint64 {
+ if m != nil && m.AvailablePermits != nil {
+ return *m.AvailablePermits
+ }
+ return 0
+}
+
+func (m *CommandConsumerStatsResponse) GetUnackedMessages() uint64 {
+ if m != nil && m.UnackedMessages != nil {
+ return *m.UnackedMessages
+ }
+ return 0
+}
+
+func (m *CommandConsumerStatsResponse) GetBlockedConsumerOnUnackedMsgs() bool {
+ if m != nil && m.BlockedConsumerOnUnackedMsgs != nil {
+ return *m.BlockedConsumerOnUnackedMsgs
+ }
+ return false
+}
+
+func (m *CommandConsumerStatsResponse) GetAddress() string {
+ if m != nil && m.Address != nil {
+ return *m.Address
+ }
+ return ""
+}
+
+func (m *CommandConsumerStatsResponse) GetConnectedSince() string {
+ if m != nil && m.ConnectedSince != nil {
+ return *m.ConnectedSince
+ }
+ return ""
+}
+
+func (m *CommandConsumerStatsResponse) GetType() string {
+ if m != nil && m.Type != nil {
+ return *m.Type
+ }
+ return ""
+}
+
+func (m *CommandConsumerStatsResponse) GetMsgRateExpired() float64 {
+ if m != nil && m.MsgRateExpired != nil {
+ return *m.MsgRateExpired
+ }
+ return 0
+}
+
+func (m *CommandConsumerStatsResponse) GetMsgBacklog() uint64 {
+ if m != nil && m.MsgBacklog != nil {
+ return *m.MsgBacklog
+ }
+ return 0
+}
+
+type CommandGetLastMessageId struct {
+ ConsumerId *uint64 `protobuf:"varint,1,req,name=consumer_id,json=consumerId" json:"consumer_id,omitempty"`
+ RequestId *uint64 `protobuf:"varint,2,req,name=request_id,json=requestId" json:"request_id,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *CommandGetLastMessageId) Reset() { *m = CommandGetLastMessageId{} }
+func (m *CommandGetLastMessageId) String() string { return proto.CompactTextString(m) }
+func (*CommandGetLastMessageId) ProtoMessage() {}
+func (*CommandGetLastMessageId) Descriptor() ([]byte, []int) {
+ return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{35}
+}
+func (m *CommandGetLastMessageId) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_CommandGetLastMessageId.Unmarshal(m, b)
+}
+func (m *CommandGetLastMessageId) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_CommandGetLastMessageId.Marshal(b, m, deterministic)
+}
+func (dst *CommandGetLastMessageId) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_CommandGetLastMessageId.Merge(dst, src)
+}
+func (m *CommandGetLastMessageId) XXX_Size() int {
+ return xxx_messageInfo_CommandGetLastMessageId.Size(m)
+}
+func (m *CommandGetLastMessageId) XXX_DiscardUnknown() {
+ xxx_messageInfo_CommandGetLastMessageId.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_CommandGetLastMessageId proto.InternalMessageInfo
+
+func (m *CommandGetLastMessageId) GetConsumerId() uint64 {
+ if m != nil && m.ConsumerId != nil {
+ return *m.ConsumerId
+ }
+ return 0
+}
+
+func (m *CommandGetLastMessageId) GetRequestId() uint64 {
+ if m != nil && m.RequestId != nil {
+ return *m.RequestId
+ }
+ return 0
+}
+
+type CommandGetLastMessageIdResponse struct {
+ LastMessageId *MessageIdData `protobuf:"bytes,1,req,name=last_message_id,json=lastMessageId" json:"last_message_id,omitempty"`
+ RequestId *uint64 `protobuf:"varint,2,req,name=request_id,json=requestId" json:"request_id,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *CommandGetLastMessageIdResponse) Reset() { *m = CommandGetLastMessageIdResponse{} }
+func (m *CommandGetLastMessageIdResponse) String() string { return proto.CompactTextString(m) }
+func (*CommandGetLastMessageIdResponse) ProtoMessage() {}
+func (*CommandGetLastMessageIdResponse) Descriptor() ([]byte, []int) {
+ return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{36}
+}
+func (m *CommandGetLastMessageIdResponse) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_CommandGetLastMessageIdResponse.Unmarshal(m, b)
+}
+func (m *CommandGetLastMessageIdResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_CommandGetLastMessageIdResponse.Marshal(b, m, deterministic)
+}
+func (dst *CommandGetLastMessageIdResponse) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_CommandGetLastMessageIdResponse.Merge(dst, src)
+}
+func (m *CommandGetLastMessageIdResponse) XXX_Size() int {
+ return xxx_messageInfo_CommandGetLastMessageIdResponse.Size(m)
+}
+func (m *CommandGetLastMessageIdResponse) XXX_DiscardUnknown() {
+ xxx_messageInfo_CommandGetLastMessageIdResponse.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_CommandGetLastMessageIdResponse proto.InternalMessageInfo
+
+func (m *CommandGetLastMessageIdResponse) GetLastMessageId() *MessageIdData {
+ if m != nil {
+ return m.LastMessageId
+ }
+ return nil
+}
+
+func (m *CommandGetLastMessageIdResponse) GetRequestId() uint64 {
+ if m != nil && m.RequestId != nil {
+ return *m.RequestId
+ }
+ return 0
+}
+
+type CommandGetTopicsOfNamespace struct {
+ RequestId *uint64 `protobuf:"varint,1,req,name=request_id,json=requestId" json:"request_id,omitempty"`
+ Namespace *string `protobuf:"bytes,2,req,name=namespace" json:"namespace,omitempty"`
+ Mode *CommandGetTopicsOfNamespace_Mode `protobuf:"varint,3,opt,name=mode,enum=pulsar.proto.CommandGetTopicsOfNamespace_Mode,def=0" json:"mode,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *CommandGetTopicsOfNamespace) Reset() { *m = CommandGetTopicsOfNamespace{} }
+func (m *CommandGetTopicsOfNamespace) String() string { return proto.CompactTextString(m) }
+func (*CommandGetTopicsOfNamespace) ProtoMessage() {}
+func (*CommandGetTopicsOfNamespace) Descriptor() ([]byte, []int) {
+ return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{37}
+}
+func (m *CommandGetTopicsOfNamespace) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_CommandGetTopicsOfNamespace.Unmarshal(m, b)
+}
+func (m *CommandGetTopicsOfNamespace) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_CommandGetTopicsOfNamespace.Marshal(b, m, deterministic)
+}
+func (dst *CommandGetTopicsOfNamespace) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_CommandGetTopicsOfNamespace.Merge(dst, src)
+}
+func (m *CommandGetTopicsOfNamespace) XXX_Size() int {
+ return xxx_messageInfo_CommandGetTopicsOfNamespace.Size(m)
+}
+func (m *CommandGetTopicsOfNamespace) XXX_DiscardUnknown() {
+ xxx_messageInfo_CommandGetTopicsOfNamespace.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_CommandGetTopicsOfNamespace proto.InternalMessageInfo
+
+const Default_CommandGetTopicsOfNamespace_Mode CommandGetTopicsOfNamespace_Mode = CommandGetTopicsOfNamespace_PERSISTENT
+
+func (m *CommandGetTopicsOfNamespace) GetRequestId() uint64 {
+ if m != nil && m.RequestId != nil {
+ return *m.RequestId
+ }
+ return 0
+}
+
+func (m *CommandGetTopicsOfNamespace) GetNamespace() string {
+ if m != nil && m.Namespace != nil {
+ return *m.Namespace
+ }
+ return ""
+}
+
+func (m *CommandGetTopicsOfNamespace) GetMode() CommandGetTopicsOfNamespace_Mode {
+ if m != nil && m.Mode != nil {
+ return *m.Mode
+ }
+ return Default_CommandGetTopicsOfNamespace_Mode
+}
+
+type CommandGetTopicsOfNamespaceResponse struct {
+ RequestId *uint64 `protobuf:"varint,1,req,name=request_id,json=requestId" json:"request_id,omitempty"`
+ Topics []string `protobuf:"bytes,2,rep,name=topics" json:"topics,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *CommandGetTopicsOfNamespaceResponse) Reset() { *m = CommandGetTopicsOfNamespaceResponse{} }
+func (m *CommandGetTopicsOfNamespaceResponse) String() string { return proto.CompactTextString(m) }
+func (*CommandGetTopicsOfNamespaceResponse) ProtoMessage() {}
+func (*CommandGetTopicsOfNamespaceResponse) Descriptor() ([]byte, []int) {
+ return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{38}
+}
+func (m *CommandGetTopicsOfNamespaceResponse) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_CommandGetTopicsOfNamespaceResponse.Unmarshal(m, b)
+}
+func (m *CommandGetTopicsOfNamespaceResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_CommandGetTopicsOfNamespaceResponse.Marshal(b, m, deterministic)
+}
+func (dst *CommandGetTopicsOfNamespaceResponse) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_CommandGetTopicsOfNamespaceResponse.Merge(dst, src)
+}
+func (m *CommandGetTopicsOfNamespaceResponse) XXX_Size() int {
+ return xxx_messageInfo_CommandGetTopicsOfNamespaceResponse.Size(m)
+}
+func (m *CommandGetTopicsOfNamespaceResponse) XXX_DiscardUnknown() {
+ xxx_messageInfo_CommandGetTopicsOfNamespaceResponse.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_CommandGetTopicsOfNamespaceResponse proto.InternalMessageInfo
+
+func (m *CommandGetTopicsOfNamespaceResponse) GetRequestId() uint64 {
+ if m != nil && m.RequestId != nil {
+ return *m.RequestId
+ }
+ return 0
+}
+
+func (m *CommandGetTopicsOfNamespaceResponse) GetTopics() []string {
+ if m != nil {
+ return m.Topics
+ }
+ return nil
+}
+
+type CommandGetSchema struct {
+ RequestId *uint64 `protobuf:"varint,1,req,name=request_id,json=requestId" json:"request_id,omitempty"`
+ Topic *string `protobuf:"bytes,2,req,name=topic" json:"topic,omitempty"`
+ SchemaVersion []byte `protobuf:"bytes,3,opt,name=schema_version,json=schemaVersion" json:"schema_version,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *CommandGetSchema) Reset() { *m = CommandGetSchema{} }
+func (m *CommandGetSchema) String() string { return proto.CompactTextString(m) }
+func (*CommandGetSchema) ProtoMessage() {}
+func (*CommandGetSchema) Descriptor() ([]byte, []int) {
+ return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{39}
+}
+func (m *CommandGetSchema) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_CommandGetSchema.Unmarshal(m, b)
+}
+func (m *CommandGetSchema) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_CommandGetSchema.Marshal(b, m, deterministic)
+}
+func (dst *CommandGetSchema) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_CommandGetSchema.Merge(dst, src)
+}
+func (m *CommandGetSchema) XXX_Size() int {
+ return xxx_messageInfo_CommandGetSchema.Size(m)
+}
+func (m *CommandGetSchema) XXX_DiscardUnknown() {
+ xxx_messageInfo_CommandGetSchema.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_CommandGetSchema proto.InternalMessageInfo
+
+func (m *CommandGetSchema) GetRequestId() uint64 {
+ if m != nil && m.RequestId != nil {
+ return *m.RequestId
+ }
+ return 0
+}
+
+func (m *CommandGetSchema) GetTopic() string {
+ if m != nil && m.Topic != nil {
+ return *m.Topic
+ }
+ return ""
+}
+
+func (m *CommandGetSchema) GetSchemaVersion() []byte {
+ if m != nil {
+ return m.SchemaVersion
+ }
+ return nil
+}
+
+type CommandGetSchemaResponse struct {
+ RequestId *uint64 `protobuf:"varint,1,req,name=request_id,json=requestId" json:"request_id,omitempty"`
+ ErrorCode *ServerError `protobuf:"varint,2,opt,name=error_code,json=errorCode,enum=pulsar.proto.ServerError" json:"error_code,omitempty"`
+ ErrorMessage *string `protobuf:"bytes,3,opt,name=error_message,json=errorMessage" json:"error_message,omitempty"`
+ Schema *Schema `protobuf:"bytes,4,opt,name=schema" json:"schema,omitempty"`
+ SchemaVersion []byte `protobuf:"bytes,5,opt,name=schema_version,json=schemaVersion" json:"schema_version,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *CommandGetSchemaResponse) Reset() { *m = CommandGetSchemaResponse{} }
+func (m *CommandGetSchemaResponse) String() string { return proto.CompactTextString(m) }
+func (*CommandGetSchemaResponse) ProtoMessage() {}
+func (*CommandGetSchemaResponse) Descriptor() ([]byte, []int) {
+ return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{40}
+}
+func (m *CommandGetSchemaResponse) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_CommandGetSchemaResponse.Unmarshal(m, b)
+}
+func (m *CommandGetSchemaResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_CommandGetSchemaResponse.Marshal(b, m, deterministic)
+}
+func (dst *CommandGetSchemaResponse) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_CommandGetSchemaResponse.Merge(dst, src)
+}
+func (m *CommandGetSchemaResponse) XXX_Size() int {
+ return xxx_messageInfo_CommandGetSchemaResponse.Size(m)
+}
+func (m *CommandGetSchemaResponse) XXX_DiscardUnknown() {
+ xxx_messageInfo_CommandGetSchemaResponse.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_CommandGetSchemaResponse proto.InternalMessageInfo
+
+func (m *CommandGetSchemaResponse) GetRequestId() uint64 {
+ if m != nil && m.RequestId != nil {
+ return *m.RequestId
+ }
+ return 0
+}
+
+func (m *CommandGetSchemaResponse) GetErrorCode() ServerError {
+ if m != nil && m.ErrorCode != nil {
+ return *m.ErrorCode
+ }
+ return ServerError_UnknownError
+}
+
+func (m *CommandGetSchemaResponse) GetErrorMessage() string {
+ if m != nil && m.ErrorMessage != nil {
+ return *m.ErrorMessage
+ }
+ return ""
+}
+
+func (m *CommandGetSchemaResponse) GetSchema() *Schema {
+ if m != nil {
+ return m.Schema
+ }
+ return nil
+}
+
+func (m *CommandGetSchemaResponse) GetSchemaVersion() []byte {
+ if m != nil {
+ return m.SchemaVersion
+ }
+ return nil
+}
+
+type BaseCommand struct {
+ Type *BaseCommand_Type `protobuf:"varint,1,req,name=type,enum=pulsar.proto.BaseCommand_Type" json:"type,omitempty"`
+ Connect *CommandConnect `protobuf:"bytes,2,opt,name=connect" json:"connect,omitempty"`
+ Connected *CommandConnected `protobuf:"bytes,3,opt,name=connected" json:"connected,omitempty"`
+ Subscribe *CommandSubscribe `protobuf:"bytes,4,opt,name=subscribe" json:"subscribe,omitempty"`
+ Producer *CommandProducer `protobuf:"bytes,5,opt,name=producer" json:"producer,omitempty"`
+ Send *CommandSend `protobuf:"bytes,6,opt,name=send" json:"send,omitempty"`
+ SendReceipt *CommandSendReceipt `protobuf:"bytes,7,opt,name=send_receipt,json=sendReceipt" json:"send_receipt,omitempty"`
+ SendError *CommandSendError `protobuf:"bytes,8,opt,name=send_error,json=sendError" json:"send_error,omitempty"`
+ Message *CommandMessage `protobuf:"bytes,9,opt,name=message" json:"message,omitempty"`
+ Ack *CommandAck `protobuf:"bytes,10,opt,name=ack" json:"ack,omitempty"`
+ Flow *CommandFlow `protobuf:"bytes,11,opt,name=flow" json:"flow,omitempty"`
+ Unsubscribe *CommandUnsubscribe `protobuf:"bytes,12,opt,name=unsubscribe" json:"unsubscribe,omitempty"`
+ Success *CommandSuccess `protobuf:"bytes,13,opt,name=success" json:"success,omitempty"`
+ Error *CommandError `protobuf:"bytes,14,opt,name=error" json:"error,omitempty"`
+ CloseProducer *CommandCloseProducer `protobuf:"bytes,15,opt,name=close_producer,json=closeProducer" json:"close_producer,omitempty"`
+ CloseConsumer *CommandCloseConsumer `protobuf:"bytes,16,opt,name=close_consumer,json=closeConsumer" json:"close_consumer,omitempty"`
+ ProducerSuccess *CommandProducerSuccess `protobuf:"bytes,17,opt,name=producer_success,json=producerSuccess" json:"producer_success,omitempty"`
+ Ping *CommandPing `protobuf:"bytes,18,opt,name=ping" json:"ping,omitempty"`
+ Pong *CommandPong `protobuf:"bytes,19,opt,name=pong" json:"pong,omitempty"`
+ RedeliverUnacknowledgedMessages *CommandRedeliverUnacknowledgedMessages `protobuf:"bytes,20,opt,name=redeliverUnacknowledgedMessages" json:"redeliverUnacknowledgedMessages,omitempty"`
+ PartitionMetadata *CommandPartitionedTopicMetadata `protobuf:"bytes,21,opt,name=partitionMetadata" json:"partitionMetadata,omitempty"`
+ PartitionMetadataResponse *CommandPartitionedTopicMetadataResponse `protobuf:"bytes,22,opt,name=partitionMetadataResponse" json:"partitionMetadataResponse,omitempty"`
+ LookupTopic *CommandLookupTopic `protobuf:"bytes,23,opt,name=lookupTopic" json:"lookupTopic,omitempty"`
+ LookupTopicResponse *CommandLookupTopicResponse `protobuf:"bytes,24,opt,name=lookupTopicResponse" json:"lookupTopicResponse,omitempty"`
+ ConsumerStats *CommandConsumerStats `protobuf:"bytes,25,opt,name=consumerStats" json:"consumerStats,omitempty"`
+ ConsumerStatsResponse *CommandConsumerStatsResponse `protobuf:"bytes,26,opt,name=consumerStatsResponse" json:"consumerStatsResponse,omitempty"`
+ ReachedEndOfTopic *CommandReachedEndOfTopic `protobuf:"bytes,27,opt,name=reachedEndOfTopic" json:"reachedEndOfTopic,omitempty"`
+ Seek *CommandSeek `protobuf:"bytes,28,opt,name=seek" json:"seek,omitempty"`
+ GetLastMessageId *CommandGetLastMessageId `protobuf:"bytes,29,opt,name=getLastMessageId" json:"getLastMessageId,omitempty"`
+ GetLastMessageIdResponse *CommandGetLastMessageIdResponse `protobuf:"bytes,30,opt,name=getLastMessageIdResponse" json:"getLastMessageIdResponse,omitempty"`
+ ActiveConsumerChange *CommandActiveConsumerChange `protobuf:"bytes,31,opt,name=active_consumer_change,json=activeConsumerChange" json:"active_consumer_change,omitempty"`
+ GetTopicsOfNamespace *CommandGetTopicsOfNamespace `protobuf:"bytes,32,opt,name=getTopicsOfNamespace" json:"getTopicsOfNamespace,omitempty"`
+ GetTopicsOfNamespaceResponse *CommandGetTopicsOfNamespaceResponse `protobuf:"bytes,33,opt,name=getTopicsOfNamespaceResponse" json:"getTopicsOfNamespaceResponse,omitempty"`
+ GetSchema *CommandGetSchema `protobuf:"bytes,34,opt,name=getSchema" json:"getSchema,omitempty"`
+ GetSchemaResponse *CommandGetSchemaResponse `protobuf:"bytes,35,opt,name=getSchemaResponse" json:"getSchemaResponse,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *BaseCommand) Reset() { *m = BaseCommand{} }
+func (m *BaseCommand) String() string { return proto.CompactTextString(m) }
+func (*BaseCommand) ProtoMessage() {}
+func (*BaseCommand) Descriptor() ([]byte, []int) {
+ return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{41}
+}
+func (m *BaseCommand) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_BaseCommand.Unmarshal(m, b)
+}
+func (m *BaseCommand) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_BaseCommand.Marshal(b, m, deterministic)
+}
+func (dst *BaseCommand) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_BaseCommand.Merge(dst, src)
+}
+func (m *BaseCommand) XXX_Size() int {
+ return xxx_messageInfo_BaseCommand.Size(m)
+}
+func (m *BaseCommand) XXX_DiscardUnknown() {
+ xxx_messageInfo_BaseCommand.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_BaseCommand proto.InternalMessageInfo
+
+func (m *BaseCommand) GetType() BaseCommand_Type {
+ if m != nil && m.Type != nil {
+ return *m.Type
+ }
+ return BaseCommand_CONNECT
+}
+
+func (m *BaseCommand) GetConnect() *CommandConnect {
+ if m != nil {
+ return m.Connect
+ }
+ return nil
+}
+
+func (m *BaseCommand) GetConnected() *CommandConnected {
+ if m != nil {
+ return m.Connected
+ }
+ return nil
+}
+
+func (m *BaseCommand) GetSubscribe() *CommandSubscribe {
+ if m != nil {
+ return m.Subscribe
+ }
+ return nil
+}
+
+func (m *BaseCommand) GetProducer() *CommandProducer {
+ if m != nil {
+ return m.Producer
+ }
+ return nil
+}
+
+func (m *BaseCommand) GetSend() *CommandSend {
+ if m != nil {
+ return m.Send
+ }
+ return nil
+}
+
+func (m *BaseCommand) GetSendReceipt() *CommandSendReceipt {
+ if m != nil {
+ return m.SendReceipt
+ }
+ return nil
+}
+
+func (m *BaseCommand) GetSendError() *CommandSendError {
+ if m != nil {
+ return m.SendError
+ }
+ return nil
+}
+
+func (m *BaseCommand) GetMessage() *CommandMessage {
+ if m != nil {
+ return m.Message
+ }
+ return nil
+}
+
+func (m *BaseCommand) GetAck() *CommandAck {
+ if m != nil {
+ return m.Ack
+ }
+ return nil
+}
+
+func (m *BaseCommand) GetFlow() *CommandFlow {
+ if m != nil {
+ return m.Flow
+ }
+ return nil
+}
+
+func (m *BaseCommand) GetUnsubscribe() *CommandUnsubscribe {
+ if m != nil {
+ return m.Unsubscribe
+ }
+ return nil
+}
+
+func (m *BaseCommand) GetSuccess() *CommandSuccess {
+ if m != nil {
+ return m.Success
+ }
+ return nil
+}
+
+func (m *BaseCommand) GetError() *CommandError {
+ if m != nil {
+ return m.Error
+ }
+ return nil
+}
+
+func (m *BaseCommand) GetCloseProducer() *CommandCloseProducer {
+ if m != nil {
+ return m.CloseProducer
+ }
+ return nil
+}
+
+func (m *BaseCommand) GetCloseConsumer() *CommandCloseConsumer {
+ if m != nil {
+ return m.CloseConsumer
+ }
+ return nil
+}
+
+func (m *BaseCommand) GetProducerSuccess() *CommandProducerSuccess {
+ if m != nil {
+ return m.ProducerSuccess
+ }
+ return nil
+}
+
+func (m *BaseCommand) GetPing() *CommandPing {
+ if m != nil {
+ return m.Ping
+ }
+ return nil
+}
+
+func (m *BaseCommand) GetPong() *CommandPong {
+ if m != nil {
+ return m.Pong
+ }
+ return nil
+}
+
+func (m *BaseCommand) GetRedeliverUnacknowledgedMessages() *CommandRedeliverUnacknowledgedMessages {
+ if m != nil {
+ return m.RedeliverUnacknowledgedMessages
+ }
+ return nil
+}
+
+func (m *BaseCommand) GetPartitionMetadata() *CommandPartitionedTopicMetadata {
+ if m != nil {
+ return m.PartitionMetadata
+ }
+ return nil
+}
+
+func (m *BaseCommand) GetPartitionMetadataResponse() *CommandPartitionedTopicMetadataResponse {
+ if m != nil {
+ return m.PartitionMetadataResponse
+ }
+ return nil
+}
+
+func (m *BaseCommand) GetLookupTopic() *CommandLookupTopic {
+ if m != nil {
+ return m.LookupTopic
+ }
+ return nil
+}
+
+func (m *BaseCommand) GetLookupTopicResponse() *CommandLookupTopicResponse {
+ if m != nil {
+ return m.LookupTopicResponse
+ }
+ return nil
+}
+
+func (m *BaseCommand) GetConsumerStats() *CommandConsumerStats {
+ if m != nil {
+ return m.ConsumerStats
+ }
+ return nil
+}
+
+func (m *BaseCommand) GetConsumerStatsResponse() *CommandConsumerStatsResponse {
+ if m != nil {
+ return m.ConsumerStatsResponse
+ }
+ return nil
+}
+
+func (m *BaseCommand) GetReachedEndOfTopic() *CommandReachedEndOfTopic {
+ if m != nil {
+ return m.ReachedEndOfTopic
+ }
+ return nil
+}
+
+func (m *BaseCommand) GetSeek() *CommandSeek {
+ if m != nil {
+ return m.Seek
+ }
+ return nil
+}
+
+func (m *BaseCommand) GetGetLastMessageId() *CommandGetLastMessageId {
+ if m != nil {
+ return m.GetLastMessageId
+ }
+ return nil
+}
+
+func (m *BaseCommand) GetGetLastMessageIdResponse() *CommandGetLastMessageIdResponse {
+ if m != nil {
+ return m.GetLastMessageIdResponse
+ }
+ return nil
+}
+
+func (m *BaseCommand) GetActiveConsumerChange() *CommandActiveConsumerChange {
+ if m != nil {
+ return m.ActiveConsumerChange
+ }
+ return nil
+}
+
+func (m *BaseCommand) GetGetTopicsOfNamespace() *CommandGetTopicsOfNamespace {
+ if m != nil {
+ return m.GetTopicsOfNamespace
+ }
+ return nil
+}
+
+func (m *BaseCommand) GetGetTopicsOfNamespaceResponse() *CommandGetTopicsOfNamespaceResponse {
+ if m != nil {
+ return m.GetTopicsOfNamespaceResponse
+ }
+ return nil
+}
+
+func (m *BaseCommand) GetGetSchema() *CommandGetSchema {
+ if m != nil {
+ return m.GetSchema
+ }
+ return nil
+}
+
+func (m *BaseCommand) GetGetSchemaResponse() *CommandGetSchemaResponse {
+ if m != nil {
+ return m.GetSchemaResponse
+ }
+ return nil
+}
+
+func init() {
+ proto.RegisterType((*Schema)(nil), "pulsar.proto.Schema")
+ proto.RegisterType((*MessageIdData)(nil), "pulsar.proto.MessageIdData")
+ proto.RegisterType((*KeyValue)(nil), "pulsar.proto.KeyValue")
+ proto.RegisterType((*KeyLongValue)(nil), "pulsar.proto.KeyLongValue")
+ proto.RegisterType((*EncryptionKeys)(nil), "pulsar.proto.EncryptionKeys")
+ proto.RegisterType((*MessageMetadata)(nil), "pulsar.proto.MessageMetadata")
+ proto.RegisterType((*SingleMessageMetadata)(nil), "pulsar.proto.SingleMessageMetadata")
+ proto.RegisterType((*CommandConnect)(nil), "pulsar.proto.CommandConnect")
+ proto.RegisterType((*CommandConnected)(nil), "pulsar.proto.CommandConnected")
+ proto.RegisterType((*CommandSubscribe)(nil), "pulsar.proto.CommandSubscribe")
+ proto.RegisterType((*CommandPartitionedTopicMetadata)(nil), "pulsar.proto.CommandPartitionedTopicMetadata")
+ proto.RegisterType((*CommandPartitionedTopicMetadataResponse)(nil), "pulsar.proto.CommandPartitionedTopicMetadataResponse")
+ proto.RegisterType((*CommandLookupTopic)(nil), "pulsar.proto.CommandLookupTopic")
+ proto.RegisterType((*CommandLookupTopicResponse)(nil), "pulsar.proto.CommandLookupTopicResponse")
+ proto.RegisterType((*CommandProducer)(nil), "pulsar.proto.CommandProducer")
+ proto.RegisterType((*CommandSend)(nil), "pulsar.proto.CommandSend")
+ proto.RegisterType((*CommandSendReceipt)(nil), "pulsar.proto.CommandSendReceipt")
+ proto.RegisterType((*CommandSendError)(nil), "pulsar.proto.CommandSendError")
+ proto.RegisterType((*CommandMessage)(nil), "pulsar.proto.CommandMessage")
+ proto.RegisterType((*CommandAck)(nil), "pulsar.proto.CommandAck")
+ proto.RegisterType((*CommandActiveConsumerChange)(nil), "pulsar.proto.CommandActiveConsumerChange")
+ proto.RegisterType((*CommandFlow)(nil), "pulsar.proto.CommandFlow")
+ proto.RegisterType((*CommandUnsubscribe)(nil), "pulsar.proto.CommandUnsubscribe")
+ proto.RegisterType((*CommandSeek)(nil), "pulsar.proto.CommandSeek")
+ proto.RegisterType((*CommandReachedEndOfTopic)(nil), "pulsar.proto.CommandReachedEndOfTopic")
+ proto.RegisterType((*CommandCloseProducer)(nil), "pulsar.proto.CommandCloseProducer")
+ proto.RegisterType((*CommandCloseConsumer)(nil), "pulsar.proto.CommandCloseConsumer")
+ proto.RegisterType((*CommandRedeliverUnacknowledgedMessages)(nil), "pulsar.proto.CommandRedeliverUnacknowledgedMessages")
+ proto.RegisterType((*CommandSuccess)(nil), "pulsar.proto.CommandSuccess")
+ proto.RegisterType((*CommandProducerSuccess)(nil), "pulsar.proto.CommandProducerSuccess")
+ proto.RegisterType((*CommandError)(nil), "pulsar.proto.CommandError")
+ proto.RegisterType((*CommandPing)(nil), "pulsar.proto.CommandPing")
+ proto.RegisterType((*CommandPong)(nil), "pulsar.proto.CommandPong")
+ proto.RegisterType((*CommandConsumerStats)(nil), "pulsar.proto.CommandConsumerStats")
+ proto.RegisterType((*CommandConsumerStatsResponse)(nil), "pulsar.proto.CommandConsumerStatsResponse")
+ proto.RegisterType((*CommandGetLastMessageId)(nil), "pulsar.proto.CommandGetLastMessageId")
+ proto.RegisterType((*CommandGetLastMessageIdResponse)(nil), "pulsar.proto.CommandGetLastMessageIdResponse")
+ proto.RegisterType((*CommandGetTopicsOfNamespace)(nil), "pulsar.proto.CommandGetTopicsOfNamespace")
+ proto.RegisterType((*CommandGetTopicsOfNamespaceResponse)(nil), "pulsar.proto.CommandGetTopicsOfNamespaceResponse")
+ proto.RegisterType((*CommandGetSchema)(nil), "pulsar.proto.CommandGetSchema")
+ proto.RegisterType((*CommandGetSchemaResponse)(nil), "pulsar.proto.CommandGetSchemaResponse")
+ proto.RegisterType((*BaseCommand)(nil), "pulsar.proto.BaseCommand")
+ proto.RegisterEnum("pulsar.proto.CompressionType", CompressionType_name, CompressionType_value)
+ proto.RegisterEnum("pulsar.proto.ServerError", ServerError_name, ServerError_value)
+ proto.RegisterEnum("pulsar.proto.AuthMethod", AuthMethod_name, AuthMethod_value)
+ proto.RegisterEnum("pulsar.proto.ProtocolVersion", ProtocolVersion_name, ProtocolVersion_value)
+ proto.RegisterEnum("pulsar.proto.Schema_Type", Schema_Type_name, Schema_Type_value)
+ proto.RegisterEnum("pulsar.proto.CommandSubscribe_SubType", CommandSubscribe_SubType_name, CommandSubscribe_SubType_value)
+ proto.RegisterEnum("pulsar.proto.CommandSubscribe_InitialPosition", CommandSubscribe_InitialPosition_name, CommandSubscribe_InitialPosition_value)
+ proto.RegisterEnum("pulsar.proto.CommandPartitionedTopicMetadataResponse_LookupType", CommandPartitionedTopicMetadataResponse_LookupType_name, CommandPartitionedTopicMetadataResponse_LookupType_value)
+ proto.RegisterEnum("pulsar.proto.CommandLookupTopicResponse_LookupType", CommandLookupTopicResponse_LookupType_name, CommandLookupTopicResponse_LookupType_value)
+ proto.RegisterEnum("pulsar.proto.CommandAck_AckType", CommandAck_AckType_name, CommandAck_AckType_value)
+ proto.RegisterEnum("pulsar.proto.CommandAck_ValidationError", CommandAck_ValidationError_name, CommandAck_ValidationError_value)
+ proto.RegisterEnum("pulsar.proto.CommandGetTopicsOfNamespace_Mode", CommandGetTopicsOfNamespace_Mode_name, CommandGetTopicsOfNamespace_Mode_value)
+ proto.RegisterEnum("pulsar.proto.BaseCommand_Type", BaseCommand_Type_name, BaseCommand_Type_value)
+}
+
+func init() { proto.RegisterFile("PulsarApi.proto", fileDescriptor_PulsarApi_9ff3bd5e091a4809) }
+
+var fileDescriptor_PulsarApi_9ff3bd5e091a4809 = []byte{
+ // 4065 bytes of a gzipped FileDescriptorProto
+ 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xcc, 0x3b, 0x4d, 0x6f, 0x1b, 0x49,
+ 0x76, 0x6e, 0x7e, 0x48, 0xe4, 0xe3, 0x57, 0xb9, 0x2c, 0x6b, 0xda, 0x1f, 0x63, 0xd3, 0xed, 0xb5,
+ 0x47, 0xe3, 0x1d, 0x2b, 0xb6, 0xec, 0x75, 0x66, 0xbc, 0x9b, 0x60, 0x28, 0xaa, 0x6d, 0x33, 0x96,
+ 0x48, 0x6d, 0x91, 0xf2, 0x62, 0x27, 0xbb, 0xe8, 0x6d, 0x75, 0x97, 0xa9, 0x86, 0x9a, 0xdd, 0x4c,
+ 0x7f, 0x68, 0xac, 0x39, 0xe4, 0x36, 0x08, 0x02, 0x04, 0x08, 0x90, 0x1c, 0x73, 0xc8, 0x21, 0x08,
+ 0x72, 0xce, 0x2d, 0x40, 0x7e, 0x43, 0x6e, 0xb9, 0xe4, 0x94, 0x4b, 0x72, 0x0c, 0x90, 0x43, 0x90,
+ 0x73, 0x50, 0xd5, 0xd5, 0x5f, 0x24, 0x45, 0xca, 0x3b, 0x7b, 0xc8, 0x89, 0xdd, 0xaf, 0x5e, 0xbd,
+ 0x7a, 0xf5, 0xde, 0xab, 0xf7, 0x55, 0x4d, 0x68, 0x1d, 0x86, 0xb6, 0xaf, 0x7b, 0x9d, 0xa9, 0xb5,
+ 0x3d, 0xf5, 0xdc, 0xc0, 0xc5, 0xf5, 0x29, 0x07, 0x44, 0x6f, 0xca, 0x7f, 0x48, 0xb0, 0x36, 0x34,
+ 0x4e, 0xe8, 0x44, 0xc7, 0x18, 0x4a, 0x8e, 0x3e, 0xa1, 0xb2, 0xd4, 0x2e, 0x6c, 0x55, 0x09, 0x7f,
+ 0xc6, 0x77, 0xa1, 0xe6, 0xf3, 0x51, 0xcd, 0xd4, 0x03, 0x5d, 0x2e, 0xb6, 0x0b, 0x5b, 0x75, 0x02,
+ 0x11, 0x68, 0x4f, 0x0f, 0x74, 0xfc, 0x18, 0x4a, 0xc1, 0xf9, 0x94, 0xca, 0xa5, 0x76, 0x61, 0xab,
+ 0xb9, 0x73, 0x63, 0x3b, 0x4b, 0x7c, 0x3b, 0x22, 0xbc, 0x3d, 0x3a, 0x9f, 0x52, 0xc2, 0xd1, 0xf0,
+ 0x0b, 0x80, 0xa9, 0xe7, 0x4e, 0xa9, 0x17, 0x58, 0xd4, 0x97, 0xcb, 0xed, 0xe2, 0x56, 0x6d, 0x67,
+ 0x33, 0x3f, 0xe9, 0x2d, 0x3d, 0x7f, 0xa7, 0xdb, 0x21, 0x25, 0x19, 0x4c, 0xe5, 0x0f, 0xa1, 0xc4,
+ 0xa8, 0xe0, 0x0a, 0x94, 0xfa, 0xae, 0x43, 0xd1, 0x15, 0x0c, 0xb0, 0x36, 0x0c, 0x3c, 0xcb, 0x19,
+ 0x23, 0x89, 0x41, 0xff, 0xc8, 0x77, 0x1d, 0x54, 0xc0, 0x75, 0xa8, 0x1c, 0x32, 0x2a, 0xc7, 0xe1,
+ 0x7b, 0x54, 0x64, 0xf0, 0xce, 0x99, 0xe7, 0xa2, 0x92, 0xf2, 0x17, 0x12, 0x34, 0x0e, 0xa8, 0xef,
+ 0xeb, 0x63, 0xda, 0x33, 0x39, 0xe3, 0x37, 0xa1, 0x62, 0x53, 0x73, 0x4c, 0xbd, 0x9e, 0xc9, 0x77,
+ 0x5c, 0x22, 0xc9, 0x3b, 0x96, 0x61, 0x9d, 0x3a, 0x81, 0x77, 0xde, 0x33, 0xe5, 0x02, 0x1f, 0x8a,
+ 0x5f, 0x71, 0x1b, 0xaa, 0x53, 0xdd, 0x0b, 0xac, 0xc0, 0x72, 0x1d, 0xb9, 0xd8, 0x96, 0xb6, 0xca,
+ 0x2f, 0x0b, 0x8f, 0x9f, 0x92, 0x14, 0x88, 0xef, 0x43, 0xed, 0x58, 0x0f, 0x8c, 0x13, 0xcd, 0x72,
+ 0x4c, 0xfa, 0x41, 0x2e, 0x25, 0x38, 0xc0, 0xc1, 0x3d, 0x06, 0x55, 0x76, 0xa0, 0x12, 0x6f, 0x13,
+ 0x23, 0x28, 0x9e, 0xd2, 0x73, 0x21, 0x75, 0xf6, 0x88, 0x37, 0xa0, 0x7c, 0xc6, 0x86, 0xf8, 0xe2,
+ 0x55, 0x12, 0xbd, 0x28, 0x2f, 0xa0, 0xfe, 0x96, 0x9e, 0xef, 0xbb, 0xce, 0xf8, 0x52, 0xf3, 0x4a,
+ 0xf1, 0x3c, 0x1b, 0x9a, 0xaa, 0x63, 0x78, 0xe7, 0x53, 0xc6, 0xde, 0x5b, 0x7a, 0xee, 0xaf, 0x9a,
+ 0x59, 0x17, 0x33, 0xf1, 0x0e, 0x54, 0x26, 0x34, 0xd0, 0x85, 0xe6, 0x97, 0xa9, 0x2a, 0xc1, 0x53,
+ 0xfe, 0xb7, 0x0c, 0x2d, 0x21, 0xe8, 0x03, 0x01, 0xc3, 0xf7, 0xa1, 0x31, 0xf5, 0x5c, 0x33, 0x34,
+ 0xa8, 0xa7, 0x65, 0x2c, 0xac, 0x1e, 0x03, 0xfb, 0xb1, 0xa5, 0xd1, 0x3f, 0x09, 0xa9, 0x63, 0x50,
+ 0xcd, 0x8a, 0xe5, 0x0e, 0x31, 0xa8, 0x67, 0xe2, 0x7b, 0x50, 0x9f, 0x86, 0xc7, 0xb6, 0xe5, 0x9f,
+ 0x68, 0x81, 0x35, 0xa1, 0xdc, 0x16, 0x4b, 0xa4, 0x26, 0x60, 0x23, 0x6b, 0x32, 0x6b, 0x5d, 0xa5,
+ 0xcb, 0x5a, 0x17, 0xfe, 0x0c, 0x5a, 0x1e, 0x9d, 0xda, 0x96, 0xa1, 0x07, 0xd4, 0xd4, 0xde, 0x7b,
+ 0xee, 0x44, 0x2e, 0xb7, 0xa5, 0xad, 0x2a, 0x69, 0xa6, 0xe0, 0x57, 0x9e, 0x3b, 0xe1, 0x3b, 0x89,
+ 0x35, 0xad, 0x31, 0x19, 0xae, 0x71, 0xb4, 0x7a, 0x02, 0x7c, 0x4b, 0xcf, 0x19, 0xa3, 0xc9, 0x34,
+ 0x2d, 0x70, 0xe5, 0xf5, 0x76, 0x71, 0xab, 0x4a, 0x6a, 0x09, 0x6c, 0xe4, 0x62, 0x15, 0x6a, 0x86,
+ 0x3b, 0x99, 0x7a, 0xd4, 0xf7, 0x99, 0x21, 0x55, 0xda, 0xd2, 0x56, 0x73, 0xe7, 0xd3, 0x3c, 0xa7,
+ 0xdd, 0x14, 0x81, 0x99, 0xfe, 0xcb, 0x52, 0x7f, 0xd0, 0x57, 0x49, 0x76, 0x1e, 0xde, 0x86, 0xab,
+ 0xa1, 0x13, 0x03, 0xa8, 0xa9, 0xf9, 0xd6, 0x77, 0x54, 0xae, 0xb6, 0xa5, 0xad, 0xc6, 0x4b, 0xe9,
+ 0x09, 0x41, 0xd9, 0xb1, 0xa1, 0xf5, 0x1d, 0xc5, 0xcf, 0xe1, 0xba, 0x13, 0x4e, 0xb4, 0x49, 0xa4,
+ 0x1f, 0x5f, 0xb3, 0x1c, 0x8d, 0x1b, 0xa5, 0x5c, 0xe3, 0x56, 0x2a, 0x3d, 0x25, 0xd8, 0x09, 0x27,
+ 0x42, 0x7d, 0x7e, 0xcf, 0xd9, 0x65, 0x83, 0xb8, 0x0d, 0x40, 0xcf, 0xa8, 0x13, 0x44, 0x62, 0xaf,
+ 0xb7, 0xa5, 0xad, 0x12, 0x23, 0x5f, 0xe5, 0x40, 0x2e, 0x77, 0x15, 0x5a, 0x34, 0x31, 0x31, 0x26,
+ 0x17, 0x5f, 0x6e, 0x70, 0xe1, 0xdf, 0xce, 0x6f, 0x29, 0x6f, 0x87, 0xa4, 0x49, 0xf3, 0x76, 0xf9,
+ 0x59, 0x8e, 0x8c, 0x6e, 0x8f, 0x5d, 0xb9, 0x19, 0xa9, 0x21, 0x05, 0x77, 0xec, 0xb1, 0x8b, 0x3f,
+ 0x07, 0x94, 0x41, 0x9c, 0xea, 0x9e, 0x3e, 0x91, 0x5b, 0x6d, 0x69, 0xab, 0x4e, 0x32, 0x04, 0x0e,
+ 0x19, 0x18, 0x3f, 0x80, 0xa6, 0x70, 0x60, 0x67, 0xd4, 0xe3, 0xc2, 0x46, 0x1c, 0xb1, 0x11, 0x41,
+ 0xdf, 0x45, 0x40, 0xfc, 0x35, 0xdc, 0xc8, 0x29, 0x56, 0x3b, 0x7e, 0xf1, 0x5c, 0xa3, 0x8e, 0xe1,
+ 0x9a, 0xd4, 0x94, 0xaf, 0xb6, 0xa5, 0xad, 0xca, 0xcb, 0xf2, 0x7b, 0xdd, 0xf6, 0x29, 0xd9, 0xcc,
+ 0xea, 0x7a, 0xf7, 0xc5, 0x73, 0x35, 0x42, 0x52, 0xfe, 0xa1, 0x00, 0xd7, 0x87, 0x96, 0x33, 0xb6,
+ 0xe9, 0xac, 0xf9, 0xe7, 0xad, 0x52, 0xba, 0xb4, 0x55, 0xce, 0x19, 0x5b, 0x61, 0xb1, 0xb1, 0x4d,
+ 0xf5, 0x73, 0xdb, 0xd5, 0x85, 0xf6, 0xd9, 0xa9, 0x28, 0x93, 0x9a, 0x80, 0x71, 0xad, 0x3f, 0x82,
+ 0x06, 0xb3, 0x03, 0xdd, 0x60, 0xc6, 0xed, 0x86, 0x01, 0xf7, 0x49, 0xc9, 0x7e, 0xea, 0xc9, 0xd8,
+ 0x20, 0x0c, 0x66, 0x74, 0x5d, 0x5e, 0xa0, 0xeb, 0xa5, 0x92, 0x5a, 0xbb, 0x8c, 0xa4, 0xfe, 0xbe,
+ 0x08, 0xcd, 0xae, 0x3b, 0x99, 0xe8, 0x8e, 0xd9, 0x75, 0x1d, 0x87, 0x1a, 0x01, 0xd3, 0x92, 0x61,
+ 0x5b, 0x6c, 0xdd, 0x58, 0x4b, 0x91, 0x8b, 0x68, 0x44, 0xd0, 0x58, 0x4b, 0x5f, 0x41, 0x4d, 0x0f,
+ 0x83, 0x13, 0x6d, 0x42, 0x83, 0x13, 0xd7, 0xe4, 0xf2, 0x68, 0xee, 0xc8, 0x79, 0x51, 0x76, 0xc2,
+ 0xe0, 0xe4, 0x80, 0x8f, 0x13, 0xd0, 0x93, 0x67, 0xbc, 0x05, 0x28, 0x33, 0x35, 0x72, 0x43, 0xe2,
+ 0x8c, 0xa7, 0x58, 0xdc, 0x11, 0xdd, 0x82, 0x2a, 0xc7, 0x14, 0x6e, 0x8f, 0x19, 0x4b, 0x85, 0x01,
+ 0x78, 0xd4, 0xf8, 0x02, 0x10, 0x5f, 0xc6, 0x70, 0xed, 0x84, 0xd5, 0xc8, 0xc5, 0x4b, 0x4f, 0x48,
+ 0x2b, 0x1e, 0x8a, 0xf9, 0x7d, 0x0c, 0xd7, 0xa6, 0x9e, 0xfb, 0xe1, 0x5c, 0x0b, 0x5c, 0xed, 0xd8,
+ 0x73, 0x4f, 0xa9, 0xa7, 0x85, 0x9e, 0x2d, 0x9c, 0x06, 0xe2, 0x43, 0x23, 0x77, 0x97, 0x0f, 0x1c,
+ 0x79, 0x36, 0x7e, 0x0c, 0xd8, 0xf5, 0xac, 0xb1, 0xe5, 0xe8, 0xb6, 0x36, 0xf5, 0x2c, 0xc7, 0xb0,
+ 0xa6, 0xba, 0x2d, 0xaf, 0x73, 0xec, 0xab, 0xf1, 0xc8, 0x61, 0x3c, 0x80, 0xbf, 0xc8, 0xa0, 0xa7,
+ 0x1c, 0x57, 0x22, 0xe2, 0xf1, 0x48, 0x27, 0xe6, 0xfc, 0x09, 0x6c, 0xe4, 0xb1, 0x85, 0x10, 0xab,
+ 0x1c, 0x1f, 0x67, 0xf1, 0x23, 0x61, 0x28, 0x63, 0x40, 0x79, 0x35, 0x51, 0x93, 0x1f, 0x27, 0xea,
+ 0x9d, 0x51, 0x6f, 0x56, 0x51, 0x11, 0x34, 0xde, 0xf8, 0x22, 0x31, 0x15, 0x2e, 0x12, 0x93, 0xf2,
+ 0x2f, 0xe5, 0x64, 0xa5, 0x61, 0x78, 0xec, 0x1b, 0x9e, 0x75, 0x4c, 0x59, 0x48, 0x0a, 0xdc, 0xa9,
+ 0x65, 0x88, 0x05, 0xa2, 0x17, 0xac, 0x40, 0xdd, 0x8f, 0x50, 0xf8, 0x19, 0x17, 0x11, 0x32, 0x07,
+ 0xc3, 0x5f, 0xc3, 0xba, 0x1f, 0x1e, 0x33, 0x9f, 0xc9, 0x4f, 0x43, 0x73, 0xe7, 0xe1, 0x9c, 0x63,
+ 0xcd, 0x2d, 0xb5, 0x3d, 0x8c, 0xb0, 0x49, 0x3c, 0x8d, 0xc5, 0x22, 0xc3, 0x75, 0xfc, 0x70, 0x42,
+ 0x3d, 0x16, 0x8b, 0x4a, 0x51, 0x2c, 0x8a, 0x41, 0x3d, 0x13, 0x7f, 0x0a, 0xe0, 0xb1, 0xc8, 0xe4,
+ 0x07, 0x6c, 0xbc, 0xcc, 0xc7, 0xab, 0x02, 0xd2, 0x33, 0xd9, 0xc9, 0x4d, 0xe6, 0x73, 0x4b, 0x13,
+ 0x61, 0x22, 0x06, 0x72, 0x3b, 0x7b, 0x00, 0xcd, 0xa9, 0x67, 0xb9, 0x9e, 0x15, 0x9c, 0x6b, 0x36,
+ 0x3d, 0xa3, 0x91, 0xa6, 0xcb, 0xa4, 0x11, 0x43, 0xf7, 0x19, 0x10, 0xdf, 0x81, 0x75, 0x33, 0xf4,
+ 0xf4, 0x63, 0x9b, 0x72, 0xd5, 0x56, 0x5e, 0x96, 0x02, 0x2f, 0xa4, 0x24, 0x06, 0x62, 0x15, 0x90,
+ 0x1f, 0xe8, 0x5e, 0x10, 0x7b, 0x75, 0xc6, 0x10, 0xd3, 0x69, 0x6d, 0xe7, 0x56, 0x7e, 0xdb, 0xb9,
+ 0xf4, 0x87, 0x34, 0xf9, 0xa4, 0x04, 0x96, 0x8b, 0xf5, 0x70, 0xb9, 0x58, 0xcf, 0x76, 0xe0, 0x51,
+ 0xdd, 0xd4, 0x12, 0x0f, 0xc2, 0xe3, 0x48, 0x85, 0x34, 0x18, 0xb4, 0x1b, 0x03, 0xf1, 0x17, 0xb0,
+ 0x16, 0x39, 0x5b, 0x1e, 0x3b, 0x6a, 0x3b, 0x1b, 0x8b, 0x92, 0x44, 0x22, 0x70, 0xf0, 0x6f, 0xa0,
+ 0x65, 0x39, 0x56, 0x60, 0xe9, 0xf6, 0xa1, 0xeb, 0x47, 0x79, 0x56, 0x83, 0x9f, 0xf3, 0xed, 0x15,
+ 0x5a, 0xec, 0xe5, 0x67, 0xbd, 0x5c, 0xdb, 0xd7, 0x03, 0xea, 0x07, 0x64, 0x96, 0x9c, 0xb2, 0x03,
+ 0xeb, 0x42, 0xe3, 0xb8, 0x01, 0x55, 0xf5, 0x83, 0x61, 0x87, 0xbe, 0x75, 0x16, 0xe7, 0x94, 0x27,
+ 0xba, 0x47, 0x4d, 0x24, 0xb1, 0x4c, 0xf2, 0x95, 0x6e, 0xd9, 0xee, 0x19, 0xf5, 0x50, 0x41, 0xf9,
+ 0x31, 0xb4, 0x66, 0xe8, 0x33, 0xe4, 0x68, 0x05, 0x74, 0x85, 0x21, 0xab, 0xba, 0x67, 0x5b, 0xec,
+ 0x4d, 0x52, 0xfe, 0x53, 0x82, 0xbb, 0x82, 0xbd, 0xc3, 0xd8, 0x05, 0x52, 0x73, 0xc4, 0x0c, 0x38,
+ 0x09, 0x0a, 0x8b, 0xcd, 0x3b, 0x6f, 0x57, 0x85, 0x59, 0xbb, 0x5a, 0xec, 0x20, 0x8a, 0x1f, 0xe7,
+ 0x20, 0x4a, 0x1f, 0xe9, 0x20, 0xca, 0x17, 0x3a, 0x88, 0x7f, 0x2a, 0xc0, 0x67, 0x2b, 0xf6, 0x49,
+ 0xa8, 0x3f, 0x75, 0x1d, 0x9f, 0xe2, 0x3b, 0x00, 0x49, 0x38, 0x60, 0x41, 0x50, 0xda, 0x6a, 0x90,
+ 0x0c, 0x64, 0xd5, 0xce, 0x7f, 0x05, 0x15, 0x4f, 0x90, 0xe2, 0xfb, 0x6d, 0xee, 0x7c, 0xbd, 0xd0,
+ 0x1c, 0x56, 0xf1, 0xb1, 0xbd, 0xef, 0xba, 0xa7, 0xe1, 0x94, 0x1f, 0xf7, 0x84, 0x22, 0xfe, 0x3d,
+ 0x28, 0x53, 0xcf, 0x73, 0x3d, 0x2e, 0x9b, 0xf9, 0x2a, 0x86, 0xbb, 0x36, 0x95, 0x21, 0x90, 0x08,
+ 0x8f, 0x15, 0x08, 0xe2, 0xb8, 0x09, 0xf1, 0xc4, 0xaf, 0xca, 0x03, 0x80, 0x74, 0x09, 0x5c, 0x63,
+ 0xa6, 0x66, 0x18, 0xd4, 0xf7, 0x23, 0xeb, 0x62, 0x16, 0xc5, 0xac, 0x4b, 0xf9, 0xbe, 0x00, 0x58,
+ 0xb0, 0x2c, 0xd0, 0xb9, 0xfe, 0x7f, 0x2b, 0xab, 0xf8, 0x31, 0x34, 0x98, 0xbe, 0x98, 0xcf, 0xd0,
+ 0x03, 0xeb, 0x2c, 0x12, 0x50, 0x12, 0x85, 0xf3, 0x63, 0x17, 0x98, 0x50, 0xe9, 0xe3, 0x4c, 0xa8,
+ 0xfc, 0x91, 0x26, 0xb4, 0x76, 0xa1, 0x09, 0xfd, 0x5b, 0x11, 0x6e, 0xce, 0xcb, 0x21, 0xb1, 0x9a,
+ 0x47, 0x80, 0xa2, 0xb8, 0xc9, 0x74, 0x60, 0x19, 0xf4, 0xc8, 0xb3, 0xb9, 0xed, 0x54, 0xc9, 0x1c,
+ 0x1c, 0x3f, 0x81, 0x6b, 0xb3, 0xb0, 0x91, 0xed, 0x8b, 0xa4, 0x69, 0xd1, 0x10, 0x1e, 0xcc, 0x19,
+ 0xd5, 0xb3, 0x85, 0x46, 0xb5, 0x80, 0xb3, 0xc5, 0x76, 0x94, 0x57, 0x54, 0x69, 0xa5, 0xa2, 0xca,
+ 0x4b, 0x14, 0x95, 0xd8, 0xe4, 0xda, 0xc7, 0xdb, 0xe4, 0x7a, 0xce, 0x26, 0x79, 0xca, 0x16, 0xa5,
+ 0x21, 0x27, 0x9e, 0x1b, 0x8e, 0x4f, 0x34, 0x3f, 0x12, 0x03, 0x4f, 0x46, 0x2a, 0xf9, 0x94, 0x8d,
+ 0xe7, 0x24, 0x11, 0x5a, 0x2a, 0x2c, 0xe5, 0x59, 0xce, 0xaa, 0xeb, 0x50, 0x21, 0xd4, 0xb4, 0x3c,
+ 0x6a, 0x30, 0xdf, 0x57, 0x83, 0x75, 0x91, 0x1f, 0x20, 0x29, 0x63, 0xe3, 0x05, 0xe5, 0xaf, 0x0b,
+ 0xd0, 0x8a, 0x8f, 0xa5, 0xa8, 0xf4, 0x2e, 0x30, 0xf0, 0xbb, 0x50, 0x4b, 0x0a, 0xc4, 0xb4, 0xf6,
+ 0x8b, 0x41, 0x73, 0xf1, 0xb6, 0xb8, 0x20, 0xde, 0xe6, 0x0b, 0xcc, 0x92, 0xc8, 0x94, 0xb3, 0x05,
+ 0xe6, 0x7d, 0xa8, 0x8a, 0xe2, 0x80, 0x9a, 0x79, 0xc9, 0xa7, 0xf0, 0x5c, 0x18, 0x5c, 0xbb, 0x64,
+ 0x18, 0x4c, 0xe3, 0xdb, 0xfa, 0xea, 0xf8, 0xa6, 0x84, 0x50, 0x8b, 0x43, 0x17, 0x75, 0xcc, 0xd9,
+ 0xad, 0x4b, 0x73, 0x5b, 0x5f, 0x59, 0x17, 0xff, 0x08, 0xea, 0xd9, 0xa2, 0x4e, 0x74, 0x25, 0xa4,
+ 0xa7, 0xa4, 0x96, 0xa9, 0xe5, 0x94, 0xbf, 0x92, 0x12, 0x87, 0xc3, 0xd6, 0x25, 0xd4, 0xa0, 0xd6,
+ 0x34, 0xf8, 0x1d, 0x2c, 0xff, 0x12, 0x20, 0x93, 0x79, 0x14, 0x57, 0x67, 0x1e, 0xd5, 0x49, 0xfc,
+ 0xaa, 0xfc, 0xad, 0x94, 0x26, 0x7e, 0xd4, 0x31, 0xb9, 0x39, 0xff, 0x0e, 0x58, 0x4a, 0x8e, 0x4e,
+ 0x71, 0x61, 0x53, 0x6a, 0xe9, 0xd1, 0x29, 0x71, 0xbb, 0x4c, 0xdc, 0xf9, 0xdf, 0x48, 0x49, 0xad,
+ 0x22, 0x76, 0x31, 0x9b, 0x1c, 0x4a, 0x73, 0xc9, 0x61, 0x5e, 0x22, 0x8c, 0xbd, 0x4b, 0x4b, 0x84,
+ 0x25, 0xce, 0x1e, 0x35, 0xa9, 0x6d, 0x9d, 0x51, 0xef, 0x5c, 0x33, 0xdc, 0xd0, 0x09, 0xb8, 0x4c,
+ 0x79, 0x41, 0xdf, 0x4a, 0x87, 0xba, 0x6c, 0x44, 0xf9, 0x9f, 0x22, 0x80, 0xe0, 0xae, 0x63, 0x9c,
+ 0xae, 0xe6, 0xec, 0xa7, 0x50, 0xd1, 0x8d, 0x53, 0x8d, 0x37, 0xec, 0x0a, 0x5c, 0x36, 0xed, 0x85,
+ 0x0e, 0xaf, 0x63, 0x9c, 0x6e, 0x77, 0x8c, 0xd3, 0x28, 0x29, 0xd6, 0xa3, 0x87, 0x39, 0x45, 0x17,
+ 0x3f, 0x62, 0x5b, 0x43, 0x40, 0x67, 0xba, 0x6d, 0x99, 0x3a, 0xaf, 0x1a, 0xb3, 0xb1, 0x76, 0xeb,
+ 0x42, 0x06, 0xde, 0x25, 0x13, 0x22, 0x5d, 0xb5, 0xce, 0xf2, 0x00, 0xc6, 0xd0, 0x5c, 0x2f, 0xf1,
+ 0xe6, 0xdc, 0x69, 0x4d, 0x1a, 0x66, 0xb9, 0x7e, 0xe2, 0xe7, 0xb0, 0x2e, 0x36, 0x88, 0x9b, 0x00,
+ 0x3d, 0xc7, 0xb4, 0xce, 0x2c, 0x33, 0xd4, 0x6d, 0x74, 0x85, 0xbd, 0x77, 0xc3, 0x49, 0x68, 0x73,
+ 0x37, 0x8c, 0x24, 0xe5, 0x2f, 0x25, 0x68, 0xcd, 0xf0, 0x82, 0xef, 0xc0, 0xcd, 0xa3, 0x99, 0xe6,
+ 0x4a, 0xd7, 0xf5, 0xbc, 0x90, 0x17, 0x20, 0xe8, 0x0a, 0xde, 0x04, 0xbc, 0x47, 0x33, 0x9d, 0x1a,
+ 0x3e, 0x0b, 0x49, 0x78, 0x03, 0x50, 0xf7, 0x84, 0x1a, 0xa7, 0x7e, 0x38, 0x39, 0xb0, 0xfc, 0x89,
+ 0x1e, 0x18, 0x27, 0xa8, 0x80, 0x6f, 0xc0, 0x75, 0xde, 0x69, 0xd9, 0xa3, 0x43, 0xea, 0x59, 0xba,
+ 0x6d, 0x7d, 0x47, 0xa3, 0x09, 0x45, 0x7c, 0x0d, 0x5a, 0x7b, 0x34, 0xee, 0x68, 0x44, 0xc0, 0x92,
+ 0x72, 0x0c, 0xb7, 0x12, 0x39, 0x31, 0x26, 0xbb, 0x42, 0xc3, 0xdd, 0x13, 0xdd, 0xb9, 0x8c, 0x81,
+ 0x2a, 0x50, 0xb5, 0x7c, 0x4d, 0xe7, 0x73, 0x79, 0x7c, 0x4c, 0x3c, 0x61, 0xc5, 0xf2, 0x23, 0x92,
+ 0xca, 0xbb, 0xc4, 0x4d, 0xbd, 0xb2, 0xdd, 0x6f, 0x57, 0xd3, 0x7c, 0x08, 0x4d, 0xa1, 0xee, 0x43,
+ 0xea, 0x4d, 0xac, 0xc0, 0xe7, 0x06, 0xd6, 0x20, 0x33, 0x50, 0x65, 0x94, 0xb8, 0xa1, 0x23, 0xc7,
+ 0x4f, 0x8a, 0xbd, 0x95, 0xe4, 0x97, 0xa7, 0x40, 0xca, 0x9f, 0x4b, 0x19, 0xaf, 0x4a, 0x4f, 0x7f,
+ 0x28, 0xbd, 0x1f, 0xe4, 0xd4, 0x7e, 0x0a, 0xb2, 0x60, 0x85, 0x50, 0xdd, 0x38, 0xa1, 0xa6, 0xea,
+ 0x98, 0x83, 0xf7, 0xa3, 0x38, 0xd0, 0x2d, 0xe5, 0x4b, 0x79, 0x07, 0x1b, 0x71, 0xcd, 0x6d, 0xbb,
+ 0x3e, 0x4d, 0xe2, 0xe6, 0x4a, 0xa7, 0xb8, 0x42, 0x40, 0x33, 0x74, 0x63, 0x8b, 0xf9, 0xc1, 0x82,
+ 0xff, 0x33, 0x09, 0x1e, 0x26, 0xbb, 0x15, 0xce, 0xe9, 0xc8, 0xd1, 0x8d, 0x53, 0xc7, 0xfd, 0x96,
+ 0xb7, 0xd3, 0x63, 0xb7, 0xe9, 0xaf, 0x5e, 0xea, 0x67, 0x50, 0x4b, 0x85, 0xce, 0xec, 0x67, 0xa5,
+ 0x87, 0x81, 0x44, 0xea, 0xbe, 0xf2, 0xeb, 0xc4, 0x51, 0x8b, 0x8c, 0x7b, 0x86, 0x75, 0x69, 0x56,
+ 0xc7, 0x69, 0xd8, 0x2e, 0x5c, 0x22, 0x6c, 0xff, 0xa3, 0x04, 0x9b, 0x33, 0xc9, 0xcc, 0x25, 0xd7,
+ 0x99, 0x4b, 0x4e, 0x0a, 0x0b, 0xba, 0xdf, 0x5f, 0x00, 0xb2, 0x75, 0x3f, 0xd0, 0xb2, 0x81, 0x8d,
+ 0x99, 0x5d, 0x91, 0x5f, 0x1d, 0x34, 0xd9, 0xd8, 0x30, 0x0d, 0x70, 0xf3, 0x4d, 0xcd, 0xd2, 0x82,
+ 0xa6, 0xa6, 0xf2, 0x01, 0xea, 0x82, 0xe5, 0xc8, 0x6b, 0xad, 0x60, 0x34, 0x09, 0x9b, 0x85, 0x8f,
+ 0x0f, 0x9b, 0xc5, 0x7c, 0xd8, 0x6c, 0x24, 0xc7, 0xf1, 0xd0, 0x72, 0xc6, 0xd9, 0x57, 0xd7, 0x19,
+ 0x67, 0x8d, 0x51, 0x68, 0x7f, 0x18, 0xe8, 0xc1, 0x4a, 0x41, 0xae, 0xea, 0xca, 0x28, 0xff, 0x5d,
+ 0x82, 0xdb, 0x8b, 0x08, 0x93, 0xc5, 0xf9, 0xf9, 0xdc, 0x02, 0x5f, 0x02, 0xf0, 0x8d, 0x69, 0x86,
+ 0x6b, 0x52, 0xd1, 0x5d, 0x5c, 0x22, 0x85, 0x2a, 0x47, 0xee, 0xba, 0x26, 0xcb, 0x2d, 0x1b, 0xd1,
+ 0xcc, 0x54, 0x1e, 0x3c, 0x01, 0xe5, 0xc0, 0x38, 0x71, 0xb8, 0x03, 0x30, 0xf1, 0xc7, 0x44, 0x0f,
+ 0xe8, 0x40, 0x34, 0x61, 0x25, 0x92, 0x81, 0xb0, 0x62, 0x67, 0xe2, 0x8f, 0x45, 0xf2, 0x3d, 0x0d,
+ 0x03, 0x86, 0x55, 0xe6, 0x58, 0x73, 0x70, 0x81, 0xcb, 0x66, 0x26, 0xc7, 0x8e, 0x17, 0x0a, 0x11,
+ 0x6e, 0x0e, 0x8e, 0x15, 0xc8, 0x35, 0x9e, 0x44, 0x75, 0x90, 0x6f, 0x46, 0x3d, 0x02, 0xa4, 0x9f,
+ 0xe9, 0x96, 0xad, 0x1f, 0xdb, 0x89, 0x03, 0x67, 0x95, 0x41, 0x89, 0xcc, 0xc1, 0xf1, 0x16, 0xb4,
+ 0x42, 0x76, 0xc4, 0xd3, 0xb3, 0xcd, 0x1b, 0x4e, 0x25, 0x32, 0x0b, 0xc6, 0xbb, 0x70, 0xfb, 0xd8,
+ 0x76, 0x19, 0x28, 0xd6, 0xc7, 0xc0, 0x39, 0x12, 0x38, 0xfe, 0xd8, 0x97, 0x81, 0xb7, 0x8b, 0x96,
+ 0xe2, 0x30, 0x23, 0xd3, 0x4d, 0x93, 0xc5, 0x51, 0xde, 0x5d, 0xaa, 0x92, 0xf8, 0x95, 0x85, 0x1c,
+ 0x23, 0x6e, 0x4c, 0x0e, 0x2d, 0xc7, 0x88, 0xee, 0x26, 0xaa, 0x64, 0x06, 0x8a, 0xb1, 0xb8, 0xa2,
+ 0x6c, 0xf0, 0xd1, 0xe8, 0x1e, 0x92, 0x85, 0xab, 0x48, 0x4e, 0xea, 0x87, 0xa9, 0xe5, 0x51, 0x93,
+ 0xdf, 0x34, 0x48, 0x64, 0x06, 0x2a, 0x74, 0xb6, 0xab, 0x1b, 0xa7, 0xb6, 0x3b, 0xe6, 0x77, 0x0c,
+ 0x25, 0x92, 0x81, 0x28, 0xbf, 0x84, 0x4f, 0x84, 0xc5, 0xbd, 0xa6, 0xc1, 0xbe, 0xee, 0x67, 0x3a,
+ 0x6a, 0x3f, 0xd4, 0xb5, 0x7e, 0x9f, 0x76, 0x91, 0x66, 0x69, 0x27, 0x06, 0xdd, 0x85, 0x16, 0x77,
+ 0x1b, 0x99, 0x60, 0x25, 0xad, 0xce, 0x37, 0x1b, 0x76, 0x8e, 0xd1, 0x15, 0x7c, 0xfc, 0xbb, 0x94,
+ 0xa4, 0x1b, 0xaf, 0x69, 0xc0, 0xe3, 0x98, 0x3f, 0x78, 0xcf, 0xac, 0xc6, 0x9f, 0xea, 0xc6, 0xca,
+ 0x43, 0x75, 0x1b, 0xaa, 0x4e, 0x8c, 0x2b, 0x5c, 0x5f, 0x0a, 0xc0, 0x7d, 0x28, 0x4d, 0xd8, 0x61,
+ 0x2b, 0x2e, 0x69, 0xf1, 0x2d, 0x5a, 0x75, 0xfb, 0xc0, 0x35, 0xe9, 0x4b, 0x38, 0x54, 0xc9, 0xb0,
+ 0x37, 0x1c, 0xa9, 0xfd, 0x11, 0xe1, 0x74, 0x94, 0x67, 0x50, 0x62, 0x23, 0x2c, 0x89, 0x4b, 0xc7,
+ 0xd0, 0x15, 0x8c, 0xa1, 0xd9, 0x1f, 0xf4, 0xb5, 0x0c, 0x4c, 0xc2, 0xeb, 0x50, 0xec, 0xec, 0xef,
+ 0xa3, 0x82, 0xf2, 0x2b, 0xb8, 0xbf, 0x64, 0xa9, 0xcb, 0x7a, 0x8f, 0x4d, 0x58, 0xe3, 0xd5, 0x6c,
+ 0x14, 0xb9, 0xaa, 0x44, 0xbc, 0x29, 0x4e, 0x52, 0xe3, 0xbc, 0xa6, 0x81, 0xb8, 0x6a, 0x5f, 0x41,
+ 0x2a, 0xa9, 0x92, 0x0b, 0xd9, 0x2a, 0x79, 0xde, 0xeb, 0x17, 0x17, 0x79, 0xfd, 0xff, 0x92, 0x92,
+ 0x04, 0x24, 0x59, 0xf0, 0xff, 0x89, 0x07, 0x4c, 0x43, 0x6e, 0xe9, 0x12, 0x9d, 0xe0, 0xf9, 0xfd,
+ 0x96, 0x17, 0xed, 0xf7, 0x9f, 0x6f, 0x40, 0x6d, 0x57, 0x67, 0x39, 0x0d, 0xdf, 0x33, 0xde, 0x11,
+ 0xc7, 0x5d, 0xe2, 0x51, 0xec, 0x4e, 0x7e, 0x89, 0x0c, 0x62, 0xfe, 0xb3, 0x84, 0x75, 0xe1, 0x34,
+ 0x44, 0x32, 0x70, 0x7b, 0xa1, 0x25, 0x8a, 0x3e, 0x07, 0x89, 0x91, 0xf1, 0xcf, 0xa0, 0x9a, 0x38,
+ 0x1b, 0x91, 0x26, 0xde, 0x59, 0x36, 0x93, 0x9a, 0x24, 0x9d, 0xc0, 0x66, 0x27, 0x29, 0xb0, 0x90,
+ 0xc8, 0x9d, 0xe5, 0x4d, 0x6e, 0x92, 0x4e, 0xc0, 0x5f, 0x41, 0x25, 0x4e, 0x21, 0xb8, 0x60, 0x6a,
+ 0x0b, 0x2e, 0x90, 0xb3, 0xe9, 0x0a, 0x49, 0xd0, 0xf1, 0x63, 0x28, 0xf9, 0xd4, 0x89, 0xfa, 0x72,
+ 0xb5, 0x59, 0x05, 0x67, 0xbb, 0x04, 0x1c, 0x0d, 0x77, 0xa1, 0xce, 0x7e, 0x35, 0x2f, 0x6a, 0x1a,
+ 0x88, 0x36, 0x47, 0xfb, 0xe2, 0x69, 0x11, 0x1e, 0xa9, 0xf9, 0x99, 0x4e, 0xc3, 0x1f, 0x00, 0x70,
+ 0x22, 0x51, 0x8a, 0x51, 0x59, 0xb6, 0xdb, 0xb8, 0x15, 0x40, 0xaa, 0x7e, 0xd2, 0x15, 0x78, 0x91,
+ 0xe6, 0x1a, 0xd5, 0x25, 0x1a, 0x12, 0x96, 0x96, 0xf6, 0xbe, 0x1e, 0x41, 0x51, 0x37, 0x4e, 0x79,
+ 0xa4, 0xa9, 0xcd, 0x5e, 0x15, 0xa6, 0xc5, 0x26, 0x61, 0x48, 0x4c, 0x2c, 0xef, 0x6d, 0xf7, 0x5b,
+ 0x1e, 0x67, 0x2e, 0x12, 0x0b, 0xab, 0x86, 0x08, 0x47, 0xc3, 0xbb, 0x50, 0x0b, 0xd3, 0x1a, 0x46,
+ 0x5c, 0x6e, 0x2c, 0x96, 0x4a, 0xa6, 0xd6, 0x21, 0xd9, 0x49, 0x6c, 0x5b, 0x7e, 0x94, 0x46, 0xf2,
+ 0xf0, 0x74, 0xd1, 0xb6, 0x44, 0xaa, 0x49, 0x62, 0x64, 0xfc, 0x24, 0xce, 0xd5, 0x9a, 0x7c, 0xd6,
+ 0xcd, 0x85, 0xb3, 0x72, 0xc9, 0x5a, 0x0f, 0x9a, 0x06, 0x4b, 0xfd, 0xb5, 0xc4, 0x68, 0x5a, 0x7c,
+ 0xaa, 0xb2, 0xd8, 0x5e, 0xb3, 0xd5, 0x07, 0x69, 0x18, 0xb9, 0x62, 0x24, 0x21, 0x15, 0x07, 0x33,
+ 0x7e, 0xa7, 0xbe, 0x94, 0x54, 0x1c, 0xdb, 0x05, 0xa9, 0xa4, 0xfe, 0x18, 0xf0, 0x8b, 0xc2, 0x28,
+ 0x39, 0x8e, 0x05, 0x71, 0x95, 0x13, 0xfb, 0xd1, 0x52, 0x63, 0x8e, 0x05, 0xd2, 0x9a, 0xce, 0x24,
+ 0xe3, 0x8f, 0xa1, 0x34, 0xb5, 0x9c, 0xb1, 0x8c, 0x97, 0xe8, 0x90, 0xe5, 0xa4, 0x84, 0xa3, 0x71,
+ 0x74, 0xd7, 0x19, 0xcb, 0xd7, 0x96, 0xa1, 0xbb, 0x1c, 0xdd, 0x75, 0xc6, 0xf8, 0x4f, 0xe1, 0xae,
+ 0xb7, 0xbc, 0xcc, 0x91, 0x37, 0x38, 0xa5, 0xe7, 0x0b, 0x29, 0xad, 0x28, 0x91, 0xc8, 0x2a, 0xe2,
+ 0xf8, 0x8f, 0xe1, 0x6a, 0x72, 0x67, 0x12, 0x5f, 0x6d, 0xc8, 0xd7, 0xf9, 0x8a, 0x8f, 0x3f, 0xee,
+ 0x3e, 0x64, 0x9e, 0x0e, 0xf6, 0x33, 0x37, 0xfb, 0xb3, 0xf7, 0x26, 0xf2, 0x26, 0x5f, 0xe4, 0x27,
+ 0xbf, 0xd5, 0xa5, 0x0b, 0xb9, 0x98, 0x2e, 0x3b, 0x44, 0x76, 0xda, 0x5e, 0x97, 0x3f, 0x59, 0x72,
+ 0x88, 0xb2, 0x6d, 0xf8, 0xec, 0x24, 0xfc, 0x0d, 0x5c, 0xb3, 0xe7, 0x5b, 0xf4, 0xb2, 0xcc, 0x69,
+ 0x6d, 0x5d, 0xb6, 0xa5, 0x4f, 0x16, 0x11, 0xc1, 0x6f, 0xd2, 0xab, 0x5c, 0x5e, 0x4b, 0xc8, 0x37,
+ 0x96, 0x99, 0x7a, 0xae, 0xea, 0xc8, 0x4f, 0xc4, 0xbf, 0x81, 0xeb, 0xc6, 0xa2, 0xaa, 0x44, 0xbe,
+ 0xc9, 0x29, 0x3e, 0xba, 0x04, 0xc5, 0x98, 0xd3, 0xc5, 0x84, 0xf0, 0x08, 0xae, 0x7a, 0xb3, 0x2d,
+ 0x07, 0xf9, 0x16, 0xa7, 0xfe, 0xf0, 0x02, 0x7b, 0x9c, 0xc1, 0x26, 0xf3, 0x04, 0xa2, 0x60, 0x41,
+ 0x4f, 0xe5, 0xdb, 0x4b, 0x83, 0x05, 0x3d, 0x25, 0x1c, 0x0d, 0xff, 0x1c, 0xd0, 0x78, 0x26, 0x5d,
+ 0x95, 0x3f, 0xe5, 0x53, 0x1f, 0x5c, 0x94, 0xdd, 0xe5, 0x73, 0xdb, 0xb9, 0xe9, 0xd8, 0x02, 0x79,
+ 0x7c, 0x41, 0x06, 0x2c, 0xdf, 0x59, 0x62, 0xfc, 0x17, 0xa5, 0xcd, 0xe4, 0x42, 0x72, 0x58, 0x83,
+ 0xcd, 0xa8, 0x2f, 0x96, 0xf8, 0x36, 0xcd, 0xe0, 0x5d, 0x35, 0xf9, 0x2e, 0x5f, 0xe8, 0xf3, 0x0b,
+ 0x22, 0xc8, 0x7c, 0x1b, 0x8e, 0x6c, 0xe8, 0x8b, 0x9a, 0x73, 0xbf, 0x86, 0x8d, 0xf1, 0x82, 0x24,
+ 0x53, 0x6e, 0x2f, 0x21, 0xbf, 0x30, 0x2b, 0x5d, 0x48, 0x06, 0x87, 0x70, 0x7b, 0xbc, 0x24, 0x87,
+ 0x95, 0xef, 0xf1, 0x65, 0x9e, 0x5e, 0x7e, 0x99, 0x58, 0x64, 0x4b, 0xc9, 0xb2, 0x4c, 0x66, 0x1c,
+ 0xe7, 0x9a, 0xb2, 0xb2, 0x24, 0xb6, 0xa7, 0x19, 0x69, 0x3a, 0x81, 0xd9, 0xed, 0x78, 0x36, 0x53,
+ 0x95, 0xef, 0x2f, 0xb1, 0xdb, 0xb9, 0xbc, 0x96, 0xcc, 0x13, 0x50, 0xfe, 0xae, 0x2c, 0xbe, 0x19,
+ 0xad, 0xc1, 0x7a, 0x77, 0xd0, 0xef, 0xab, 0xdd, 0x11, 0x2a, 0xe0, 0x06, 0x54, 0xc5, 0x8b, 0xba,
+ 0x87, 0x8a, 0xec, 0x75, 0x78, 0xb4, 0x3b, 0xec, 0x92, 0xde, 0xae, 0x8a, 0x4a, 0xfc, 0xf3, 0x51,
+ 0x32, 0xd8, 0x3b, 0xea, 0xaa, 0x04, 0x95, 0x71, 0x05, 0x4a, 0x43, 0xb5, 0xbf, 0x87, 0xd6, 0x30,
+ 0x82, 0x3a, 0x7b, 0xd2, 0x88, 0xda, 0x55, 0x7b, 0x87, 0x23, 0xb4, 0xce, 0x0a, 0x0c, 0x0e, 0x51,
+ 0x09, 0x19, 0x10, 0x54, 0x61, 0x8b, 0x1c, 0xa8, 0xc3, 0x61, 0xe7, 0xb5, 0x8a, 0xaa, 0xbc, 0xb2,
+ 0xe8, 0xbe, 0x45, 0xc0, 0x28, 0xbc, 0xda, 0x1f, 0xfc, 0x02, 0xd5, 0x70, 0x0b, 0x6a, 0x47, 0xfd,
+ 0x74, 0xa9, 0x3a, 0xbf, 0x1a, 0x3e, 0xea, 0x76, 0xd5, 0xe1, 0x10, 0x35, 0x70, 0x15, 0xca, 0x11,
+ 0xa1, 0x26, 0xab, 0x54, 0xba, 0xfb, 0x83, 0xa1, 0xaa, 0x25, 0x8c, 0xb4, 0x52, 0x58, 0x77, 0xd0,
+ 0x1f, 0x1e, 0x1d, 0xa8, 0x04, 0x21, 0xbc, 0x01, 0x28, 0xc6, 0xd0, 0x62, 0x42, 0x57, 0xd9, 0x82,
+ 0x87, 0xbd, 0xfe, 0x6b, 0x84, 0xf9, 0xd3, 0xa0, 0xff, 0x1a, 0x5d, 0xc3, 0x0f, 0xe0, 0x1e, 0x51,
+ 0xf7, 0xd4, 0xfd, 0xde, 0x3b, 0x95, 0x68, 0x47, 0xfd, 0x4e, 0xf7, 0x6d, 0x7f, 0xf0, 0x8b, 0x7d,
+ 0x75, 0xef, 0xb5, 0xba, 0xa7, 0x09, 0x9e, 0x87, 0x68, 0x03, 0xcb, 0xb0, 0x71, 0xd8, 0x21, 0xa3,
+ 0xde, 0xa8, 0x37, 0xe8, 0xf3, 0x91, 0x51, 0x67, 0xaf, 0x33, 0xea, 0xa0, 0xeb, 0xf8, 0x1e, 0x7c,
+ 0xba, 0x68, 0x44, 0x23, 0xea, 0xf0, 0x70, 0xd0, 0x1f, 0xaa, 0x68, 0x93, 0x7f, 0x0c, 0x31, 0x18,
+ 0xbc, 0x3d, 0x3a, 0x44, 0x9f, 0xe0, 0x6b, 0xd0, 0x8a, 0x9e, 0x53, 0x04, 0x99, 0x6f, 0x41, 0x30,
+ 0xaf, 0x0d, 0x47, 0x9d, 0xd1, 0x10, 0xdd, 0xc0, 0xb7, 0xe0, 0x93, 0x3c, 0x2c, 0x9d, 0x70, 0x93,
+ 0xb1, 0x43, 0xd4, 0x4e, 0xf7, 0x8d, 0xba, 0xa7, 0x31, 0x39, 0x0f, 0x5e, 0x69, 0xa3, 0xc1, 0x61,
+ 0xaf, 0x8b, 0x6e, 0x45, 0x6a, 0x51, 0xdf, 0xa2, 0xdb, 0xf8, 0x13, 0xb8, 0xf6, 0x5a, 0x1d, 0x69,
+ 0xfb, 0x9d, 0xe1, 0x28, 0xde, 0x89, 0xd6, 0xdb, 0x43, 0x9f, 0xe2, 0x36, 0xdc, 0x5e, 0x30, 0x90,
+ 0x92, 0xbf, 0x83, 0x6f, 0xc2, 0x66, 0xa7, 0x3b, 0xea, 0xbd, 0x4b, 0x65, 0xaa, 0x75, 0xdf, 0x74,
+ 0xfa, 0xaf, 0x55, 0x74, 0x97, 0xf1, 0xc5, 0x66, 0xf3, 0xf5, 0x86, 0x6c, 0xe5, 0x7e, 0xe7, 0x40,
+ 0x1d, 0x1e, 0x76, 0xba, 0x2a, 0x6a, 0xe3, 0x1f, 0x41, 0xfb, 0x82, 0xc1, 0x94, 0xfc, 0x3d, 0x66,
+ 0x1e, 0x0c, 0x6b, 0xd8, 0x7d, 0xa3, 0x1e, 0x74, 0x90, 0x12, 0x73, 0x1a, 0xbd, 0xa7, 0x88, 0xf7,
+ 0x1f, 0x7d, 0xc9, 0xef, 0x48, 0xb3, 0x1f, 0x7a, 0xf2, 0x6f, 0x9c, 0x07, 0x7d, 0x15, 0x5d, 0x61,
+ 0x76, 0xb4, 0xff, 0xcd, 0xf3, 0xe8, 0x03, 0xe7, 0x6f, 0xf6, 0x7b, 0xbb, 0xa8, 0xc0, 0x9f, 0x86,
+ 0xa3, 0x3d, 0x54, 0x7c, 0xf4, 0xaf, 0x45, 0xa8, 0x65, 0x8a, 0x31, 0x66, 0xa3, 0x47, 0x0e, 0xcb,
+ 0x19, 0xc4, 0x3d, 0xc1, 0x15, 0x7c, 0x15, 0x1a, 0x71, 0xbc, 0xcd, 0x5c, 0x40, 0x1c, 0xb2, 0xba,
+ 0xc9, 0x0f, 0xa8, 0x63, 0x88, 0x5b, 0x86, 0x02, 0xe3, 0xae, 0x13, 0x06, 0x27, 0xd4, 0x09, 0x2c,
+ 0x23, 0xbd, 0xe5, 0x40, 0x45, 0xbc, 0x09, 0xb8, 0x13, 0xdd, 0x4a, 0x7f, 0x97, 0x81, 0x97, 0xd8,
+ 0x5a, 0xb1, 0x5f, 0xdb, 0x0d, 0xfd, 0x73, 0x54, 0x66, 0x4a, 0x17, 0xf7, 0xc5, 0x7d, 0x37, 0x20,
+ 0x54, 0x37, 0xcf, 0xd1, 0x1a, 0xb3, 0xbc, 0x38, 0x61, 0xdb, 0x8d, 0x7a, 0x3c, 0x3f, 0x0f, 0xdd,
+ 0x40, 0x57, 0x3f, 0x18, 0x94, 0x9a, 0x34, 0xca, 0x4f, 0xd1, 0x3a, 0xfe, 0x1c, 0x1e, 0x2c, 0x45,
+ 0xfb, 0x60, 0xd0, 0xe8, 0x62, 0xa5, 0xc2, 0xb6, 0x14, 0x5f, 0xa0, 0x44, 0xb3, 0xab, 0x4c, 0x5b,
+ 0x2c, 0xbd, 0x9e, 0x4e, 0x5d, 0x2f, 0xa0, 0xa6, 0xa8, 0x0a, 0xa3, 0x41, 0x60, 0xf8, 0xdc, 0x6b,
+ 0xf5, 0xdd, 0xe0, 0x95, 0x1b, 0x3a, 0x26, 0xaa, 0x31, 0xc3, 0x1a, 0x66, 0x3e, 0x17, 0x4b, 0x46,
+ 0xea, 0xfc, 0x76, 0x26, 0x6e, 0x8a, 0xc5, 0xd0, 0x06, 0xdb, 0xd9, 0xc8, 0x75, 0x0f, 0x74, 0xe7,
+ 0x9c, 0x44, 0x75, 0xb2, 0x8f, 0x9a, 0x8c, 0x08, 0xa7, 0x3b, 0xa2, 0xde, 0xc4, 0x72, 0xf4, 0x20,
+ 0xde, 0x4c, 0x8b, 0x89, 0x26, 0xd9, 0x0c, 0x13, 0x0d, 0x3f, 0xa9, 0x3d, 0x87, 0x5f, 0x5e, 0x45,
+ 0xac, 0xe8, 0x13, 0x8a, 0xae, 0x32, 0xd1, 0xf6, 0xf8, 0x15, 0x92, 0x1e, 0x58, 0xc7, 0x36, 0x8d,
+ 0x9c, 0x17, 0xc2, 0x8f, 0xde, 0x02, 0xa4, 0xdf, 0x47, 0xb0, 0x63, 0x93, 0xbe, 0x89, 0x2f, 0xdf,
+ 0xaf, 0x41, 0x2b, 0x85, 0xfd, 0xd2, 0xd0, 0xdf, 0x3d, 0x8d, 0x14, 0x9b, 0x02, 0x3b, 0x4c, 0x97,
+ 0x3e, 0x2a, 0x3c, 0xfa, 0x5e, 0x82, 0xd6, 0xe1, 0xcc, 0x47, 0x89, 0x6b, 0x50, 0x38, 0x7b, 0x82,
+ 0xae, 0xf0, 0x5f, 0x36, 0x93, 0xfd, 0xee, 0xa0, 0x02, 0xff, 0x7d, 0x86, 0x8a, 0xfc, 0xf7, 0x39,
+ 0x2a, 0xf1, 0xdf, 0x9f, 0xa0, 0x32, 0xff, 0x7d, 0x81, 0xd6, 0xf8, 0xef, 0xef, 0xa3, 0x75, 0xfe,
+ 0xfb, 0x25, 0xaa, 0xf0, 0xdf, 0xaf, 0x22, 0x67, 0x77, 0xf6, 0xf4, 0x09, 0x82, 0xe8, 0xe1, 0x29,
+ 0xaa, 0x45, 0x0f, 0x3b, 0xa8, 0x1e, 0x3d, 0x3c, 0x43, 0x8d, 0xdd, 0x87, 0xa0, 0xb8, 0xde, 0x78,
+ 0x5b, 0x9f, 0xb2, 0xe4, 0x22, 0x76, 0xe9, 0x86, 0x3b, 0x99, 0xb8, 0xce, 0xb6, 0x1e, 0xff, 0x33,
+ 0xe1, 0x4d, 0xf1, 0xff, 0x02, 0x00, 0x00, 0xff, 0xff, 0x3f, 0x99, 0x3c, 0x38, 0xad, 0x30, 0x00,
+ 0x00,
+}
diff --git a/pulsar/reader.go b/pulsar/reader.go
new file mode 100644
index 0000000..5592630
--- /dev/null
+++ b/pulsar/reader.go
@@ -0,0 +1,84 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package pulsar
+
+import "context"
+
+type ReaderMessage struct {
+ Reader
+ Message
+}
+
+type ReaderOptions struct {
+ // Specify the topic this consumer will subscribe on.
+ // This argument is required when constructing the reader.
+ Topic string
+
+ // Set the reader name.
+ Name string
+
+ // The initial reader positioning is done by specifying a message id. The options are:
+ // * `pulsar.EarliestMessage` : Start reading from the earliest message available in the topic
+ // * `pulsar.LatestMessage` : Start reading from the end topic, only getting messages published after the
+ // reader was created
+ // * `MessageID` : Start reading from a particular message id, the reader will position itself on that
+ // specific position. The first message to be read will be the message next to the specified
+ // messageID
+ StartMessageID MessageID
+
+ // Sets a `MessageChannel` for the consumer
+ // When a message is received, it will be pushed to the channel for consumption
+ MessageChannel chan ReaderMessage
+
+ // Sets the size of the consumer receive queue.
+ // The consumer receive queue controls how many messages can be accumulated by the Reader before the
+ // application calls Reader.readNext(). Using a higher value could potentially increase the consumer
+ // throughput at the expense of bigger memory utilization.
+ //
+ // Default value is {@code 1000} messages and should be good for most use cases.
+ ReceiverQueueSize int
+
+ // Set the subscription role prefix. The default prefix is "reader".
+ SubscriptionRolePrefix string
+
+ // If enabled, the reader will read messages from the compacted topic rather than reading the full message backlog
+ // of the topic. This means that, if the topic has been compacted, the reader will only see the latest value for
+ // each key in the topic, up until the point in the topic message backlog that has been compacted. Beyond that
+ // point, the messages will be sent as normal.
+ //
+ // ReadCompacted can only be enabled when reading from a persistent topic. Attempting to enable it on non-persistent
+ // topics will lead to the reader create call throwing a PulsarClientException.
+ ReadCompacted bool
+}
+
+// A Reader can be used to scan through all the messages currently available in a topic.
+type Reader interface {
+ // The topic from which this reader is reading from
+ Topic() string
+
+ // Read the next message in the topic, blocking until a message is available
+ Next(context.Context) (Message, error)
+
+ // Check if there is any message available to read from the current position
+ HasNext() (bool, error)
+
+ // Close the reader and stop the broker to push more messages
+ Close() error
+}