You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/14 16:33:39 UTC

[pulsar-client-go] 02/38: Initial import

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git

commit 84cb32ea75fbc322d107995620681e0d6feff202
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Wed Mar 27 09:34:18 2019 -0700

    Initial import
---
 .gitignore                          |    1 +
 pulsar/client.go                    |  111 +
 pulsar/consumer.go                  |  179 ++
 pulsar/error.go                     |   80 +
 pulsar/impl/batch_builder.go        |   23 +
 pulsar/impl/buffer.go               |  119 ++
 pulsar/impl/buffer_test.go          |   19 +
 pulsar/impl/closable.go             |    5 +
 pulsar/impl/commands.go             |   33 +
 pulsar/impl/connection.go           |  293 +++
 pulsar/impl/connection_pool.go      |   54 +
 pulsar/impl/connection_reader.go    |  117 +
 pulsar/impl/default_router.go       |   26 +
 pulsar/impl/lookup_service.go       |   62 +
 pulsar/impl/rpc_client.go           |   71 +
 pulsar/impl/topic_name.go           |   88 +
 pulsar/impl/topic_name_test.go      |   68 +
 pulsar/impl_client.go               |   98 +
 pulsar/impl_partition_producer.go   |   88 +
 pulsar/impl_producer.go             |  123 ++
 pulsar/message.go                   |   90 +
 pulsar/producer.go                  |  187 ++
 pulsar/pulsar_proto/PulsarApi.pb.go | 4043 +++++++++++++++++++++++++++++++++++
 pulsar/reader.go                    |   84 +
 24 files changed, 6062 insertions(+)

diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..723ef36
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1 @@
+.idea
\ No newline at end of file
diff --git a/pulsar/client.go b/pulsar/client.go
new file mode 100644
index 0000000..21db637
--- /dev/null
+++ b/pulsar/client.go
@@ -0,0 +1,111 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package pulsar
+
+import (
+	"time"
+)
+
+func NewClient(options ClientOptions) (Client, error) {
+	return newClient(options)
+}
+
+// Opaque interface that represents the authentication credentials
+type Authentication interface {}
+
+// Create new Authentication provider with specified auth token
+func NewAuthenticationToken(token string) Authentication {
+	// TODO: return newAuthenticationToken(token)
+	return nil
+}
+
+// Create new Authentication provider with specified auth token supplier
+func NewAuthenticationTokenSupplier(tokenSupplier func() string) Authentication {
+	// TODO:  return newAuthenticationTokenSupplier(tokenSupplier)
+	return nil
+}
+
+// Create new Authentication provider with specified TLS certificate and private key
+func NewAuthenticationTLS(certificatePath string, privateKeyPath string) Authentication {
+	// TODO: return newAuthenticationTLS(certificatePath, privateKeyPath)
+	return nil
+}
+
+// Create new Athenz Authentication provider with configuration in JSON form
+func NewAuthenticationAthenz(authParams string) Authentication {
+	// TODO: return newAuthenticationAthenz(authParams)
+	return nil
+}
+
+// Builder interface that is used to construct a Pulsar Client instance.
+type ClientOptions struct {
+	// Configure the service URL for the Pulsar service.
+	// This parameter is required
+	URL string
+
+	ConnectionTimeout time.Duration
+
+	// Set the operation timeout (default: 30 seconds)
+	// Producer-create, subscribe and unsubscribe operations will be retried until this interval, after which the
+	// operation will be marked as failed
+	OperationTimeout time.Duration
+
+	// Configure the authentication provider. (default: no authentication)
+	// Example: `Authentication: NewAuthenticationTLS("my-cert.pem", "my-key.pem")`
+	Authentication
+
+	// Set the path to the trusted TLS certificate file
+	TLSTrustCertsFilePath string
+
+	// Configure whether the Pulsar client accept untrusted TLS certificate from broker (default: false)
+	TLSAllowInsecureConnection bool
+
+	// Configure whether the Pulsar client verify the validity of the host name from broker (default: false)
+	TLSValidateHostname bool
+}
+
+type Client interface {
+	// Create the producer instance
+	// This method will block until the producer is created successfully
+	CreateProducer(ProducerOptions) (Producer, error)
+
+	// Create a `Consumer` by subscribing to a topic.
+	//
+	// If the subscription does not exist, a new subscription will be created and all messages published after the
+	// creation will be retained until acknowledged, even if the consumer is not connected
+	Subscribe(ConsumerOptions) (Consumer, error)
+
+	// Create a Reader instance.
+	// This method will block until the reader is created successfully.
+	CreateReader(ReaderOptions) (Reader, error)
+
+	// Fetch the list of partitions for a given topic
+	//
+	// If the topic is partitioned, this will return a list of partition names.
+	// If the topic is not partitioned, the returned list will contain the topic
+	// name itself.
+	//
+	// This can be used to discover the partitions and create {@link Reader},
+	// {@link Consumer} or {@link Producer} instances directly on a particular partition.
+	TopicPartitions(topic string) ([]string, error)
+
+	// Close the Client and free associated resources
+	Close() error
+}
diff --git a/pulsar/consumer.go b/pulsar/consumer.go
new file mode 100644
index 0000000..b32e5fc
--- /dev/null
+++ b/pulsar/consumer.go
@@ -0,0 +1,179 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package pulsar
+
+import (
+	"context"
+	"time"
+)
+
+// Pair of a Consumer and Message
+type ConsumerMessage struct {
+	Consumer
+	Message
+}
+
+// Types of subscription supported by Pulsar
+type SubscriptionType int
+
+const (
+	// There can be only 1 consumer on the same topic with the same subscription name
+	Exclusive SubscriptionType = iota
+
+	// Multiple consumer will be able to use the same subscription name and the messages will be dispatched according to
+	// a round-robin rotation between the connected consumers
+	Shared
+
+	// Multiple consumer will be able to use the same subscription name but only 1 consumer will receive the messages.
+	// If that consumer disconnects, one of the other connected consumers will start receiving messages.
+	Failover
+)
+
+type InitialPosition int
+
+const (
+	// Latest position which means the start consuming position will be the last message
+	Latest InitialPosition = iota
+
+	// Earliest position which means the start consuming position will be the first message
+	Earliest
+)
+
+// ConsumerBuilder is used to configure and create instances of Consumer
+type ConsumerOptions struct {
+	// Specify the topic this consumer will subscribe on.
+	// Either a topic, a list of topics or a topics pattern are required when subscribing
+	Topic string
+
+	// Specify a list of topics this consumer will subscribe on.
+	// Either a topic, a list of topics or a topics pattern are required when subscribing
+	Topics []string
+
+	// Specify a regular expression to subscribe to multiple topics under the same namespace.
+	// Either a topic, a list of topics or a topics pattern are required when subscribing
+	TopicsPattern string
+
+	// Specify the subscription name for this consumer
+	// This argument is required when subscribing
+	SubscriptionName string
+
+	// Attach a set of application defined properties to the consumer
+	// This properties will be visible in the topic stats
+	Properties map[string]string
+
+	// Set the timeout for unacked messages
+	// Message not acknowledged within the give time, will be replayed by the broker to the same or a different consumer
+	// Default is 0, which means message are not being replayed based on ack time
+	AckTimeout time.Duration
+
+	// Select the subscription type to be used when subscribing to the topic.
+	// Default is `Exclusive`
+	Type SubscriptionType
+
+	// InitialPosition at which the cursor will be set when subscribe
+	// Default is `Latest`
+	SubscriptionInitPos InitialPosition
+
+	// Sets a `MessageChannel` for the consumer
+	// When a message is received, it will be pushed to the channel for consumption
+	MessageChannel chan ConsumerMessage
+
+	// Sets the size of the consumer receive queue.
+	// The consumer receive queue controls how many messages can be accumulated by the `Consumer` before the
+	// application calls `Consumer.receive()`. Using a higher value could potentially increase the consumer
+	// throughput at the expense of bigger memory utilization.
+	// Default value is `1000` messages and should be good for most use cases.
+	// Set to -1 to disable prefetching in consumer
+	ReceiverQueueSize int
+
+	// Set the max total receiver queue size across partitions.
+	// This setting will be used to reduce the receiver queue size for individual partitions
+	// ReceiverQueueSize(int) if the total exceeds this value (default: 50000).
+	MaxTotalReceiverQueueSizeAcrossPartitions int
+
+	// Set the consumer name.
+	Name string
+
+	// If enabled, the consumer will read messages from the compacted topic rather than reading the full message backlog
+	// of the topic. This means that, if the topic has been compacted, the consumer will only see the latest value for
+	// each key in the topic, up until the point in the topic message backlog that has been compacted. Beyond that
+	// point, the messages will be sent as normal.
+	//
+	// ReadCompacted can only be enabled subscriptions to persistent topics, which have a single active consumer (i.e.
+	//  failure or exclusive subscriptions). Attempting to enable it on subscriptions to a non-persistent topics or on a
+	//  shared subscription, will lead to the subscription call throwing a PulsarClientException.
+	ReadCompacted bool
+}
+
+// An interface that abstracts behavior of Pulsar's consumer
+type Consumer interface {
+	// Get the topic for the consumer
+	Topic() string
+
+	// Get a subscription for the consumer
+	Subscription() string
+
+	// Unsubscribe the consumer
+	Unsubscribe() error
+
+	// Receives a single message.
+	// This calls blocks until a message is available.
+	Receive(context.Context) (Message, error)
+
+	// Ack the consumption of a single message
+	Ack(Message) error
+
+	// Ack the consumption of a single message, identified by its MessageID
+	AckID(MessageID) error
+
+	// Ack the reception of all the messages in the stream up to (and including) the provided message.
+	// This method will block until the acknowledge has been sent to the broker. After that, the messages will not be
+	// re-delivered to this consumer.
+	//
+	// Cumulative acknowledge cannot be used when the consumer type is set to ConsumerShared.
+	//
+	// It's equivalent to calling asyncAcknowledgeCumulative(Message) and waiting for the callback to be triggered.
+	AckCumulative(Message) error
+
+	// Ack the reception of all the messages in the stream up to (and including) the provided message.
+	// This method will block until the acknowledge has been sent to the broker. After that, the messages will not be
+	// re-delivered to this consumer.
+	//
+	// Cumulative acknowledge cannot be used when the consumer type is set to ConsumerShared.
+	//
+	// It's equivalent to calling asyncAcknowledgeCumulative(MessageID) and waiting for the callback to be triggered.
+	AckCumulativeID(MessageID) error
+
+	// Close the consumer and stop the broker to push more messages
+	Close() error
+
+	// Reset the subscription associated with this consumer to a specific message id.
+	// The message id can either be a specific message or represent the first or last messages in the topic.
+	//
+	// Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the
+	//       seek() on the individual partitions.
+	Seek(msgID MessageID) error
+
+	// Redelivers all the unacknowledged messages. In Failover mode, the request is ignored if the consumer is not
+	// active for the given topic. In Shared mode, the consumers messages to be redelivered are distributed across all
+	// the connected consumers. This is a non blocking call and doesn't throw an exception. In case the connection
+	// breaks, the messages are redelivered after reconnect.
+	RedeliverUnackedMessages()
+}
diff --git a/pulsar/error.go b/pulsar/error.go
new file mode 100644
index 0000000..929b250
--- /dev/null
+++ b/pulsar/error.go
@@ -0,0 +1,80 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package pulsar
+import "C"
+import "fmt"
+
+type Result int
+
+const (
+	UnknownError                          Result = 1  // Unknown error happened on broker
+	InvalidConfiguration                  Result = 2  // Invalid configuration
+	TimeoutError                          Result = 3  // Operation timed out
+	LookupError                           Result = 4  // Broker lookup failed
+	ConnectError                          Result = 5  // Failed to connect to broker
+	ReadError                             Result = 6  // Failed to read from socket
+	AuthenticationError                   Result = 7  // Authentication failed on broker
+	AuthorizationError                    Result = 8  // Client is not authorized to create producer/consumer
+	ErrorGettingAuthenticationData        Result = 9  // Client cannot find authorization data
+	BrokerMetadataError                   Result = 10 // Broker failed in updating metadata
+	BrokerPersistenceError                Result = 11 // Broker failed to persist entry
+	ChecksumError                         Result = 12 // Corrupt message checksum failure
+	ConsumerBusy                          Result = 13 // Exclusive consumer is already connected
+	NotConnectedError                     Result = 14 // Producer/Consumer is not currently connected to broker
+	AlreadyClosedError                    Result = 15 // Producer/Consumer is already closed and not accepting any operation
+	InvalidMessage                        Result = 16 // Error in publishing an already used message
+	ConsumerNotInitialized                Result = 17 // Consumer is not initialized
+	ProducerNotInitialized                Result = 18 // Producer is not initialized
+	TooManyLookupRequestException         Result = 19 // Too Many concurrent LookupRequest
+	InvalidTopicName                      Result = 20 // Invalid topic name
+	InvalidUrl                            Result = 21 // Client Initialized with Invalid Broker Url (VIP Url passed to Client Constructor)
+	ServiceUnitNotReady                   Result = 22 // Service Unit unloaded between client did lookup and producer/consumer got created
+	OperationNotSupported                 Result = 23
+	ProducerBlockedQuotaExceededError     Result = 24 // Producer is blocked
+	ProducerBlockedQuotaExceededException Result = 25 // Producer is getting exception
+	ProducerQueueIsFull                   Result = 26 // Producer queue is full
+	MessageTooBig                         Result = 27 // Trying to send a messages exceeding the max size
+	TopicNotFound                         Result = 28 // Topic not found
+	SubscriptionNotFound                  Result = 29 // Subscription not found
+	ConsumerNotFound                      Result = 30 // Consumer not found
+	UnsupportedVersionError               Result = 31 // Error when an older client/version doesn't support a required feature
+	TopicTerminated                       Result = 32 // Topic was already terminated
+	CryptoError                           Result = 33 // Error when crypto operation fails
+)
+
+type Error struct {
+	msg    string
+	result Result
+}
+
+func (e *Error) Result() Result {
+	return e.result
+}
+
+func (e *Error) Error() string {
+	return e.msg
+}
+
+func newError(result Result, msg string) error {
+	return &Error{
+		msg:    fmt.Sprintf("%s: %d", msg, result),
+		result: result,
+	}
+}
\ No newline at end of file
diff --git a/pulsar/impl/batch_builder.go b/pulsar/impl/batch_builder.go
new file mode 100644
index 0000000..7eb2030
--- /dev/null
+++ b/pulsar/impl/batch_builder.go
@@ -0,0 +1,23 @@
+package impl
+
+type BatchBuilder struct {
+	buffer Buffer
+}
+
+func NewBatchBuilder() *BatchBuilder {
+	return &BatchBuilder{
+		buffer: NewBuffer(4096),
+	}
+}
+
+func (bb *BatchBuilder) isFull() bool {
+	return false
+}
+
+func (bb *BatchBuilder) hasSpace(size int) bool {
+	return false
+}
+
+func (bb *BatchBuilder) Flush() []byte {
+	return nil
+}
diff --git a/pulsar/impl/buffer.go b/pulsar/impl/buffer.go
new file mode 100644
index 0000000..8e81f0c
--- /dev/null
+++ b/pulsar/impl/buffer.go
@@ -0,0 +1,119 @@
+package impl
+
+import "encoding/binary"
+
+type Buffer interface {
+	ReadableBytes() uint32
+
+	WritableBytes() uint32
+
+	Capacity() uint32
+
+	IsWritable() bool
+
+	Read(size uint32) []byte
+
+	ReadableSlice() []byte
+
+	WritableSlice() []byte
+
+	// Advance the writer index when data was written in a slice
+	WrittenBytes(size uint32)
+
+	// Copy the available portion of data at the beginning of the buffer
+	MoveToFront()
+
+	ReadUint32() uint32
+
+	WriteUint32(n uint32)
+
+	Write(s []byte)
+
+	Resize(newSize uint32)
+
+	Clear()
+}
+
+type buffer struct {
+	data []byte
+
+	readerIdx uint32
+	writerIdx uint32
+}
+
+func NewBuffer(size int) Buffer {
+	return &buffer{
+		data:      make([]byte, size),
+		readerIdx: 0,
+		writerIdx: 0,
+	}
+}
+
+func (b *buffer) ReadableBytes() uint32 {
+	return b.writerIdx - b.readerIdx
+}
+
+func (b *buffer) WritableBytes() uint32 {
+	return uint32(cap(b.data)) - b.writerIdx
+}
+
+func (b *buffer) Capacity() uint32 {
+	return uint32(cap(b.data))
+}
+
+func (b *buffer) IsWritable() bool {
+	return b.WritableBytes() > 0
+}
+
+func (b *buffer) Read(size uint32) []byte {
+	res := b.data[b.readerIdx : b.readerIdx+size]
+	b.readerIdx += size
+	return res
+}
+
+func (b *buffer) ReadableSlice() []byte {
+	return b.data[b.readerIdx:b.writerIdx]
+}
+
+func (b *buffer) WritableSlice() []byte {
+	return b.data[b.writerIdx:]
+}
+
+func (b *buffer) WrittenBytes(size uint32) {
+	b.writerIdx += size
+}
+
+func (b *buffer) MoveToFront() {
+	size := b.ReadableBytes()
+	copy(b.data, b.Read(size))
+	b.readerIdx = 0
+	b.writerIdx = size
+}
+
+func (b *buffer) Resize(newSize uint32) {
+	newData := make([]byte, newSize)
+	size := b.ReadableBytes()
+	copy(newData, b.Read(size))
+	b.data = newData
+	b.readerIdx = 0
+	b.writerIdx = size
+}
+
+func (b *buffer) ReadUint32() uint32 {
+	return binary.BigEndian.Uint32(b.Read(4))
+}
+
+func (b *buffer) WriteUint32(n uint32) {
+	binary.BigEndian.PutUint32(b.WritableSlice(), n)
+	b.writerIdx += 4
+}
+
+func (b *buffer) Write(s []byte) {
+	copy(b.WritableSlice(), s)
+	b.writerIdx += uint32(len(s))
+}
+
+func (b *buffer) Clear() {
+	b.readerIdx = 0
+	b.writerIdx = 0
+}
diff --git a/pulsar/impl/buffer_test.go b/pulsar/impl/buffer_test.go
new file mode 100644
index 0000000..1d72196
--- /dev/null
+++ b/pulsar/impl/buffer_test.go
@@ -0,0 +1,19 @@
+package impl
+
+import (
+	"testing"
+
+	"github.com/stretchr/testify/assert"
+)
+
+func TestBuffer(t *testing.T) {
+	b := NewBuffer(1024)
+	assert.Equal(t, uint32(0), b.ReadableBytes())
+	assert.Equal(t, uint32(1024), b.WritableBytes())
+	assert.Equal(t, uint32(1024), b.Capacity())
+
+	b.Write([]byte("hello"))
+	assert.Equal(t, uint32(5), b.ReadableBytes())
+	assert.Equal(t, uint32(1019), b.WritableBytes())
+	assert.Equal(t, uint32(1024), b.Capacity())
+}
diff --git a/pulsar/impl/closable.go b/pulsar/impl/closable.go
new file mode 100644
index 0000000..c483901
--- /dev/null
+++ b/pulsar/impl/closable.go
@@ -0,0 +1,5 @@
+package impl
+
+type Closable interface {
+	Close() error
+}
diff --git a/pulsar/impl/commands.go b/pulsar/impl/commands.go
new file mode 100644
index 0000000..90760e1
--- /dev/null
+++ b/pulsar/impl/commands.go
@@ -0,0 +1,33 @@
+package impl
+
+import (
+	"github.com/golang/protobuf/proto"
+	log "github.com/sirupsen/logrus"
+	pb "pulsar-client-go-native/pulsar/pulsar_proto"
+)
+
+const MaxFrameSize = 5 * 1024 * 1024
+
+func baseCommand(cmdType pb.BaseCommand_Type, msg proto.Message) *pb.BaseCommand {
+	cmd := &pb.BaseCommand{
+		Type: &cmdType,
+	}
+	switch cmdType {
+	case pb.BaseCommand_CONNECT:
+		cmd.Connect = msg.(*pb.CommandConnect)
+		break
+	case pb.BaseCommand_LOOKUP:
+		cmd.LookupTopic = msg.(*pb.CommandLookupTopic)
+		break
+
+	case pb.BaseCommand_PARTITIONED_METADATA:
+		cmd.PartitionMetadata = msg.(*pb.CommandPartitionedTopicMetadata)
+		break
+
+	default:
+		log.Panic("Missing command type: ", cmdType)
+	}
+
+	return cmd
+}
+
diff --git a/pulsar/impl/connection.go b/pulsar/impl/connection.go
new file mode 100644
index 0000000..6379f6a
--- /dev/null
+++ b/pulsar/impl/connection.go
@@ -0,0 +1,293 @@
+package impl
+
+import (
+	"errors"
+	"github.com/golang/protobuf/proto"
+	log "github.com/sirupsen/logrus"
+	"net"
+	pb "pulsar-client-go-native/pulsar/pulsar_proto"
+	"sync"
+	"sync/atomic"
+)
+
+type Connection interface {
+	SendRequest(requestId uint64, req *pb.BaseCommand, callback func(command *pb.BaseCommand))
+	Close()
+}
+
+type connectionState int
+
+const (
+	connectionInit connectionState = iota
+	connectionConnecting
+	connectionTcpConnected
+	connectionReady
+	connectionClosed
+)
+
+type request struct {
+	id       uint64
+	cmd      *pb.BaseCommand
+	callback func(command *pb.BaseCommand)
+}
+
+type connection struct {
+	sync.Mutex
+	cond  *sync.Cond
+	state connectionState
+
+	logicalAddr  string
+	physicalAddr string
+	cnx          net.Conn
+
+	writeBuffer Buffer
+	reader      *connectionReader
+
+	log *log.Entry
+
+	requestIdGenerator uint64
+
+	incomingRequests chan *request
+	pendingReqs      map[uint64]*request
+}
+
+func newConnection(logicalAddr string, physicalAddr string) *connection {
+	cnx := &connection{
+		state:        connectionInit,
+		logicalAddr:  logicalAddr,
+		physicalAddr: physicalAddr,
+		writeBuffer:  NewBuffer(4096),
+		log:          log.WithField("raddr", physicalAddr),
+		pendingReqs:  make(map[uint64]*request),
+	}
+	cnx.reader = newConnectionReader(cnx)
+	cnx.cond = sync.NewCond(cnx)
+	return cnx
+}
+
+func (c *connection) start() {
+	// Each connection gets its own goroutine that will
+	go func() {
+		if c.connect() {
+			if c.doHandshake() {
+				c.run()
+			} else {
+				c.changeState(connectionClosed)
+			}
+		} else {
+			c.changeState(connectionClosed)
+		}
+	}()
+}
+
+func (c *connection) connect() (ok bool) {
+	c.log.Info("Connecting to broker")
+
+	var err error
+	c.cnx, err = net.Dial("tcp", c.physicalAddr)
+	if err != nil {
+		c.log.WithError(err).Warn("Failed to connect to broker.")
+		c.internalClose()
+		return false
+	} else {
+		c.log = c.log.WithField("laddr", c.cnx.LocalAddr())
+		c.log.Info("TCP connection established")
+		c.state = connectionTcpConnected
+		return true
+	}
+}
+
+func (c *connection) doHandshake() (ok bool) {
+	// Send 'Connect' command to initiate handshake
+	version := int32(pb.ProtocolVersion_v13)
+	c.writeCommand(baseCommand(pb.BaseCommand_CONNECT, &pb.CommandConnect{
+		ProtocolVersion: &version,
+		ClientVersion:   proto.String("Pulsar Go 0.1"),
+		// AuthMethodName: "token",
+		// AuthData: authData,
+	}))
+
+	cmd, _, err := c.reader.readSingleCommand()
+	if err != nil {
+		c.log.WithError(err).Warn("Failed to perform initial handshake")
+		return false
+	}
+
+	if cmd.Connected == nil {
+		c.log.Warnf("Failed to perform initial handshake - Expecting 'Connected' cmd, got '%s'",
+			cmd.Type)
+		return false
+	}
+
+	c.log.Info("Connection is ready")
+	c.changeState(connectionReady)
+	return true
+}
+
+func (c *connection) waitUntilReady() error {
+	c.Lock()
+	defer c.Unlock()
+
+	for {
+		switch c.state {
+		case connectionInit:
+		case connectionConnecting:
+		case connectionTcpConnected:
+			// Wait for the state to change
+			c.cond.Wait()
+			break
+
+		case connectionReady:
+			return nil
+
+		case connectionClosed:
+			return errors.New("connection error")
+		}
+	}
+}
+
+func (c *connection) run() {
+	// All reads come from the reader goroutine
+	go c.reader.readFromConnection()
+
+	for {
+		select {
+		case req := <-c.incomingRequests:
+			c.pendingReqs[req.id] = req
+			c.writeCommand(req.cmd)
+		}
+	}
+}
+
+func (c *connection) writeCommand(cmd proto.Message) {
+	// Wire format
+	// [FRAME_SIZE] [CMD_SIZE][CMD]
+	cmdSize := uint32(proto.Size(cmd))
+	frameSize := cmdSize + 4
+	bufferSize := frameSize + 4
+
+	c.writeBuffer.Clear()
+	if c.writeBuffer.WritableBytes() < bufferSize {
+		c.writeBuffer.Resize(c.writeBuffer.Capacity() * 2)
+	}
+
+	c.writeBuffer.WriteUint32(frameSize)
+	c.writeBuffer.WriteUint32(cmdSize)
+	serialized, err := proto.Marshal(cmd)
+	if err != nil {
+		c.log.WithError(err).Fatal("Protobuf serialization error")
+	}
+
+	c.writeBuffer.Write(serialized)
+
+	if _, err := c.cnx.Write(c.writeBuffer.ReadableSlice()); err != nil {
+		c.log.WithError(err).Warn("Failed to write on connection")
+		c.internalClose()
+	}
+}
+
+func (c *connection) receivedCommand(cmd *pb.BaseCommand, headersAndPayload []byte) {
+	c.log.Infof("Received command: %s -- payload: %v", cmd, headersAndPayload)
+
+	switch *cmd.Type {
+	case pb.BaseCommand_SUCCESS:
+		c.handleResponse(*cmd.Success.RequestId, cmd)
+		break
+
+	case pb.BaseCommand_PRODUCER_SUCCESS:
+		c.handleResponse(*cmd.ProducerSuccess.RequestId, cmd)
+		break
+
+	case pb.BaseCommand_PARTITIONED_METADATA_RESPONSE:
+		c.handleResponse(*cmd.PartitionMetadataResponse.RequestId, cmd)
+		break
+
+	case pb.BaseCommand_LOOKUP_RESPONSE:
+		c.handleResponse(*cmd.LookupTopicResponse.RequestId, cmd)
+		break
+
+	case pb.BaseCommand_CONSUMER_STATS_RESPONSE:
+		c.handleResponse(*cmd.ConsumerStatsResponse.RequestId, cmd)
+		break
+
+	case pb.BaseCommand_GET_LAST_MESSAGE_ID_RESPONSE:
+		c.handleResponse(*cmd.GetLastMessageIdResponse.RequestId, cmd)
+		break
+
+	case pb.BaseCommand_GET_TOPICS_OF_NAMESPACE_RESPONSE:
+		c.handleResponse(*cmd.GetTopicsOfNamespaceResponse.RequestId, cmd)
+		break
+
+	case pb.BaseCommand_GET_SCHEMA_RESPONSE:
+		c.handleResponse(*cmd.GetSchemaResponse.RequestId, cmd)
+		break
+
+	case pb.BaseCommand_ERROR:
+		break
+
+	case pb.BaseCommand_CLOSE_PRODUCER:
+	case pb.BaseCommand_CLOSE_CONSUMER:
+
+	case pb.BaseCommand_SEND_RECEIPT:
+		break
+	case pb.BaseCommand_SEND_ERROR:
+		break
+
+	case pb.BaseCommand_MESSAGE:
+		break
+
+	case pb.BaseCommand_PONG:
+
+	case pb.BaseCommand_ACTIVE_CONSUMER_CHANGE:
+		// TODO
+		break
+
+	default:
+		c.log.Errorf("Received invalid command type: %s", cmd.Type)
+		c.Close()
+	}
+}
+
+func (c *connection) SendRequest(requestId uint64, req *pb.BaseCommand, callback func(command *pb.BaseCommand)) {
+	c.pendingReqs[requestId] = &request{
+		id:       requestId,
+		cmd:      req,
+		callback: callback,
+	}
+	c.writeCommand(req)
+}
+
+func (c *connection) handleResponse(requestId uint64, response *pb.BaseCommand) {
+	request, ok := c.pendingReqs[requestId]
+	if !ok {
+		c.log.Warnf("Received unexpected response for request %d of type %s", requestId, response.Type)
+		return
+	}
+
+	delete(c.pendingReqs, requestId)
+	request.callback(response)
+}
+
+func (c *connection) Close() {
+	// TODO
+}
+
+func (c *connection) changeState(state connectionState) {
+	c.Lock()
+	c.state = state
+	c.cond.Broadcast()
+	c.Unlock()
+}
+
+func (c *connection) newRequestId() uint64 {
+	return atomic.AddUint64(&c.requestIdGenerator, 1)
+}
+
+func (c *connection) internalClose() {
+	c.state = connectionClosed
+	c.cond.Broadcast()
+
+	if c.cnx != nil {
+		c.cnx.Close()
+	}
+}
diff --git a/pulsar/impl/connection_pool.go b/pulsar/impl/connection_pool.go
new file mode 100644
index 0000000..676f205
--- /dev/null
+++ b/pulsar/impl/connection_pool.go
@@ -0,0 +1,54 @@
+package impl
+
+import (
+	"sync"
+)
+
+type ConnectionPool interface {
+	GetConnection(logicalAddr string, physicalAddr string) (Connection, error)
+
+	// Close all the connections in the pool
+	Close()
+}
+
+type connectionPool struct {
+	pool sync.Map
+}
+
+func NewConnectionPool() ConnectionPool {
+	return &connectionPool{}
+}
+
+func (p *connectionPool) GetConnection(logicalAddr string, physicalAddr string) (Connection, error) {
+	cachedCnx, found := p.pool.Load(logicalAddr)
+	if found {
+		cnx := cachedCnx.(*connection)
+		if err := cnx.waitUntilReady(); err != nil {
+			// Connection is ready to be used
+			return cnx, nil
+		} else {
+			// The cached connection is failed
+			p.pool.Delete(logicalAddr)
+		}
+	}
+
+	// Try to create a new connection
+	newCnx, wasCached := p.pool.LoadOrStore(logicalAddr, newConnection(logicalAddr, physicalAddr))
+	cnx := newCnx.(*connection)
+	if !wasCached {
+		cnx.start()
+	}
+
+	if err := cnx.waitUntilReady(); err != nil {
+		return nil, err
+	} else {
+		return cnx, nil
+	}
+}
+
+func (p *connectionPool) Close() {
+	p.pool.Range(func(key, value interface{}) bool {
+		value.(Connection).Close()
+		return true
+	})
+}
diff --git a/pulsar/impl/connection_reader.go b/pulsar/impl/connection_reader.go
new file mode 100644
index 0000000..5bb87c6
--- /dev/null
+++ b/pulsar/impl/connection_reader.go
@@ -0,0 +1,117 @@
+package impl
+
+import (
+	"bufio"
+	"github.com/golang/protobuf/proto"
+	"github.com/pkg/errors"
+	"io"
+	pb "pulsar-client-go-native/pulsar/pulsar_proto"
+)
+
+type connectionReader struct {
+	cnx    *connection
+	buffer Buffer
+	reader *bufio.Reader
+}
+
+func newConnectionReader(cnx *connection) *connectionReader {
+	return &connectionReader{
+		cnx:    cnx,
+		reader: bufio.NewReader(cnx.cnx),
+		buffer: NewBuffer(4096),
+	}
+}
+
+func (r *connectionReader) readFromConnection() {
+	for {
+		cmd, headersAndPayload, err := r.readSingleCommand()
+		if err != nil {
+			r.cnx.log.WithError(err).Info("Error reading from connection")
+			r.cnx.internalClose()
+			break
+		}
+
+		// Process
+		r.cnx.log.Debug("Got command! ", cmd, " with payload ", headersAndPayload)
+		r.cnx.receivedCommand(cmd, headersAndPayload)
+	}
+}
+
+func (r *connectionReader) readSingleCommand() (cmd *pb.BaseCommand, headersAndPayload []byte, err error) {
+	// First, we need to read the frame size
+	if r.buffer.ReadableBytes() < 4 {
+		if r.buffer.ReadableBytes() == 0 {
+			// If the buffer is empty, just go back to write at the beginning
+			r.buffer.Clear()
+		}
+		if !r.readAtLeast(4) {
+			return nil, nil, errors.New("Short read when reading frame size")
+		}
+	}
+
+	// We have enough to read frame size
+	frameSize := r.buffer.ReadUint32()
+	if frameSize > MaxFrameSize {
+		r.cnx.log.Warnf("Received too big frame size. size=%d", frameSize)
+		r.cnx.internalClose()
+		return nil, nil, errors.New("Frame size too big")
+	}
+
+	// Next, we read the rest of the frame
+	if r.buffer.ReadableBytes() < frameSize {
+		if !r.readAtLeast(frameSize) {
+			return nil, nil, errors.New("Short read when reading frame")
+		}
+	}
+
+	// We have now the complete frame
+	cmdSize := r.buffer.ReadUint32()
+	cmd, err = r.deserializeCmd(r.buffer.Read(cmdSize))
+	if err != nil {
+		return nil, nil, err
+	}
+
+	// Also read the eventual payload
+	headersAndPayloadSize := frameSize - (cmdSize + 4)
+	if cmdSize+4 < frameSize {
+		headersAndPayload = make([]byte, headersAndPayloadSize)
+		copy(headersAndPayload, r.buffer.Read(headersAndPayloadSize))
+	}
+	return cmd, headersAndPayload, nil
+}
+
+func (r *connectionReader) readAtLeast(size uint32) (ok bool) {
+	if r.buffer.WritableBytes() < size {
+		// There's not enough room in the current buffer to read the requested amount of data
+		totalFrameSize := r.buffer.ReadableBytes() + size
+		if r.buffer.ReadableBytes()+size > r.buffer.Capacity() {
+			// Resize to a bigger buffer to avoid continuous resizing
+			r.buffer.Resize(totalFrameSize * 2)
+		} else {
+			// Compact the buffer by moving the partial data to the beginning.
+			// This will have enough room for reading the remainder of the data
+			r.buffer.MoveToFront()
+		}
+	}
+
+	n, err := io.ReadAtLeast(r.cnx.cnx, r.buffer.WritableSlice(), int(size))
+	if err != nil {
+		r.cnx.internalClose()
+		return false
+	}
+
+	r.buffer.WrittenBytes(uint32(n))
+	return true
+}
+
+func (r *connectionReader) deserializeCmd(data []byte) (*pb.BaseCommand, error) {
+	cmd := &pb.BaseCommand{}
+	err := proto.Unmarshal(data, cmd)
+	if err != nil {
+		r.cnx.log.WithError(err).Warn("Failed to parse protobuf command")
+		r.cnx.internalClose()
+		return nil, err
+	} else {
+		return cmd, nil
+	}
+}
diff --git a/pulsar/impl/default_router.go b/pulsar/impl/default_router.go
new file mode 100644
index 0000000..bb5ca73
--- /dev/null
+++ b/pulsar/impl/default_router.go
@@ -0,0 +1,26 @@
+package impl
+
+import (
+	"time"
+)
+
+type defaultRouter struct {
+	partitionIdx     uint32
+	maxBatchingDelay time.Duration
+}
+
+func NewDefaultRouter(maxBatchingDelay time.Duration) func(uint32) int {
+	// TODO: XXX
+	//state := &defaultRouter{
+	//	partitionIdx:     rand.Uint32(),
+	//	maxBatchingDelay: maxBatchingDelay,
+	//}
+
+	return func(numPartitions uint32) int {
+		if numPartitions == 1 {
+			return 0
+		}
+
+		return -1
+	}
+}
diff --git a/pulsar/impl/lookup_service.go b/pulsar/impl/lookup_service.go
new file mode 100644
index 0000000..0ebf424
--- /dev/null
+++ b/pulsar/impl/lookup_service.go
@@ -0,0 +1,62 @@
+package impl
+
+import (
+	log "github.com/sirupsen/logrus"
+	"net/url"
+	pb "pulsar-client-go-native/pulsar/pulsar_proto"
+)
+
+type LookupResult struct {
+	LogicalAddr  *url.URL
+	PhysicalAddr *url.URL
+}
+
+type LookupService interface {
+	Lookup(topic string) (*LookupResult, error)
+}
+
+type lookupService struct {
+	rpcClient RpcClient
+}
+
+func NewLookupService(rpcClient RpcClient) LookupService {
+	return &lookupService{
+		rpcClient: rpcClient,
+	}
+}
+
+func (ls *lookupService) Lookup(topic string) (*LookupResult, error) {
+	// Follow brokers redirect up to certain number of times
+	for i := 0; i < 20; i++ {
+		id := ls.rpcClient.NewRequestId()
+		res, err := ls.rpcClient.RequestToAnyBroker(id, pb.BaseCommand_LOOKUP, &pb.CommandLookupTopic{
+			RequestId: &id,
+			Topic:     &topic,
+		})
+
+		if err != nil {
+			return nil, err
+		}
+
+		log.Infof("Got lookup response: %s", res)
+		lr := res.Response.LookupTopicResponse
+		switch *lr.Response {
+		case pb.CommandLookupTopicResponse_Redirect:
+			log.WithField("topic", topic).Infof("Follow redirect to broker. %v / %v - Use proxy: %v",
+				lr.BrokerServiceUrl, lr.BrokerServiceUrlTls, lr.ProxyThroughServiceUrl)
+			break
+
+		case pb.CommandLookupTopicResponse_Connect:
+			log.WithField("topic", topic).Infof("Successfully looked up topic on broker. %s / %s - Use proxy: %t",
+				lr.GetBrokerServiceUrl(), lr.GetBrokerServiceUrlTls(), lr.GetProxyThroughServiceUrl())
+			return nil, nil
+
+		case pb.CommandLookupTopicResponse_Failed:
+			log.WithField("topic", topic).Warn("Failed to lookup topic",
+				lr.Error.String())
+			return nil, nil
+		}
+	}
+
+	return nil, nil
+}
diff --git a/pulsar/impl/rpc_client.go b/pulsar/impl/rpc_client.go
new file mode 100644
index 0000000..fd76569
--- /dev/null
+++ b/pulsar/impl/rpc_client.go
@@ -0,0 +1,71 @@
+package impl
+
+import (
+	"github.com/golang/protobuf/proto"
+	"net/url"
+	pb "pulsar-client-go-native/pulsar/pulsar_proto"
+	"sync"
+	"sync/atomic"
+)
+
+type RpcResult struct {
+	Response *pb.BaseCommand
+	Cnx      Connection
+}
+
+type RpcClient interface {
+	// Create a new unique request id
+	NewRequestId() uint64
+
+	// Send a request and block until the result is available
+	RequestToAnyBroker(requestId uint64, cmdType pb.BaseCommand_Type, message proto.Message) (*RpcResult, error)
+
+	Request(logicalAddr string, physicalAddr string, requestId uint64,
+		cmdType pb.BaseCommand_Type, message proto.Message) (*RpcResult, error)
+}
+
+type rpcClient struct {
+	hostPort           string
+	pool               ConnectionPool
+	requestIdGenerator uint64
+}
+
+func NewRpcClient(serviceUrl *url.URL, pool ConnectionPool) RpcClient {
+	return &rpcClient{
+		hostPort: serviceUrl.Host,
+		pool:     pool,
+	}
+}
+
+func (c *rpcClient) RequestToAnyBroker(requestId uint64, cmdType pb.BaseCommand_Type, message proto.Message) (*RpcResult, error) {
+	return c.Request(c.hostPort, c.hostPort, requestId, cmdType, message)
+}
+
+func (c *rpcClient) Request(logicalAddr string, physicalAddr string, requestId uint64,
+	cmdType pb.BaseCommand_Type, message proto.Message) (*RpcResult, error) {
+	// TODO: Add retry logic in case of connection issues
+	cnx, err := c.pool.GetConnection(logicalAddr, physicalAddr)
+	if err != nil {
+		return nil, err
+	}
+
+	wg := sync.WaitGroup{}
+	wg.Add(1)
+
+	rpcResult := &RpcResult{
+		Cnx: cnx,
+	}
+
+	// TODO: Handle errors with disconnections
+	cnx.SendRequest(requestId, baseCommand(cmdType, message), func(response *pb.BaseCommand) {
+		rpcResult.Response = response
+		wg.Done()
+	})
+
+	wg.Wait()
+	return rpcResult, nil
+}
+
+func (c *rpcClient) NewRequestId() uint64 {
+	return atomic.AddUint64(&c.requestIdGenerator, 1)
+}
diff --git a/pulsar/impl/topic_name.go b/pulsar/impl/topic_name.go
new file mode 100644
index 0000000..2bafeb2
--- /dev/null
+++ b/pulsar/impl/topic_name.go
@@ -0,0 +1,88 @@
+package impl
+
+import (
+	"errors"
+	"fmt"
+	"strconv"
+	"strings"
+)
+
+type TopicName struct {
+	Name      string
+	Namespace string
+	Partition int
+}
+
+const (
+	publicTenant           = "public"
+	defaultNamespace       = "default"
+	partitionedTopicSuffix = "-partition-"
+)
+
+func ParseTopicName(topic string) (*TopicName, error) {
+	// The topic name can be in two different forms, one is fully qualified topic name,
+	// the other one is short topic name
+	if !strings.Contains(topic, "://") {
+		// The short topic name can be:
+		// - <topic>
+		// - <property>/<namespace>/<topic>
+		parts := strings.Split(topic, "/")
+		if len(parts) == 3 {
+			topic = "persistent://" + topic
+		} else if len(parts) == 1 {
+			topic = "persistent://" + publicTenant + "/" + defaultNamespace + "/" + parts[0]
+		} else {
+			return nil, errors.New(
+				"Invalid short topic name '" + topic +
+					"', it should be in the format of <tenant>/<namespace>/<topic> or <topic>")
+		}
+	}
+
+	tn := &TopicName{}
+
+	// The fully qualified topic name can be in two different forms:
+	// new:    persistent://tenant/namespace/topic
+	// legacy: persistent://tenant/cluster/namespace/topic
+	parts := strings.SplitN(topic, "://", 2)
+	domain := parts[0]
+	if domain != "persistent" && domain != "non-persistent" {
+		return nil, errors.New("Invalid topic domain: " + domain)
+	}
+
+	rest := parts[1]
+	var err error
+
+	// The rest of the name can be in different forms:
+	// new:    tenant/namespace/<localName>
+	// legacy: tenant/cluster/namespace/<localName>
+	// Examples of localName:
+	// 1. some/name/xyz//
+	// 2. /xyz-123/feeder-2
+	parts = strings.SplitN(rest, "/", 4)
+	if len(parts) == 3 {
+		// New topic name without cluster name
+		tn.Namespace = parts[0] + "/" + parts[1]
+	} else if len(parts) == 4 {
+		// Legacy topic name that includes cluster name
+		tn.Namespace = fmt.Sprintf("%s/%s/%s", parts[0], parts[1], parts[2])
+	} else {
+		return nil, errors.New("Invalid topic name: " + topic)
+	}
+
+	tn.Name = topic
+	tn.Partition, err = getPartitionIndex(topic)
+	if err != nil {
+		return nil, err
+	}
+
+	return tn, nil
+}
+
+func getPartitionIndex(topic string) (int, error) {
+	if strings.Contains(topic, partitionedTopicSuffix) {
+		idx  := strings.LastIndex(topic, "-") + 1
+		return strconv.Atoi(topic[idx:])
+	} else {
+		return -1, nil
+	}
+}
diff --git a/pulsar/impl/topic_name_test.go b/pulsar/impl/topic_name_test.go
new file mode 100644
index 0000000..b4d6184
--- /dev/null
+++ b/pulsar/impl/topic_name_test.go
@@ -0,0 +1,68 @@
+package impl
+
+import (
+	"testing"
+
+	"github.com/stretchr/testify/assert"
+)
+
+func TestParseTopicName(t *testing.T) {
+	topic, err := ParseTopicName("persistent://my-tenant/my-ns/my-topic")
+
+	assert.Nil(t, err)
+	assert.Equal(t, "persistent://my-tenant/my-ns/my-topic", topic.Name)
+	assert.Equal(t, "my-tenant/my-ns", topic.Namespace)
+	assert.Equal(t, -1, topic.Partition)
+
+	topic, err = ParseTopicName("my-topic")
+	assert.Nil(t, err)
+	assert.Equal(t, "persistent://public/default/my-topic", topic.Name)
+	assert.Equal(t, "public/default", topic.Namespace)
+	assert.Equal(t, -1, topic.Partition)
+
+	topic, err = ParseTopicName("my-tenant/my-namespace/my-topic")
+	assert.Nil(t, err)
+	assert.Equal(t, "persistent://my-tenant/my-namespace/my-topic", topic.Name)
+	assert.Equal(t, "my-tenant/my-namespace", topic.Namespace)
+	assert.Equal(t, -1, topic.Partition)
+
+	topic, err = ParseTopicName("non-persistent://my-tenant/my-namespace/my-topic")
+	assert.Nil(t, err)
+	assert.Equal(t, "non-persistent://my-tenant/my-namespace/my-topic", topic.Name)
+	assert.Equal(t, "my-tenant/my-namespace", topic.Namespace)
+	assert.Equal(t, -1, topic.Partition)
+
+
+	topic, err = ParseTopicName("my-topic-partition-5")
+	assert.Nil(t, err)
+	assert.Equal(t, "persistent://public/default/my-topic-partition-5", topic.Name)
+	assert.Equal(t, "public/default", topic.Namespace)
+	assert.Equal(t, 5, topic.Partition)
+
+	// V1 topic name
+	topic, err = ParseTopicName("persistent://my-tenant/my-cluster/my-ns/my-topic")
+	assert.Nil(t, err)
+	assert.Equal(t, "persistent://my-tenant/my-cluster/my-ns/my-topic", topic.Name)
+	assert.Equal(t, "my-tenant/my-cluster/my-ns", topic.Namespace)
+	assert.Equal(t, -1, topic.Partition)
+}
+
+func TestParseTopicNameErrors(t *testing.T) {
+	_, err := ParseTopicName("invalid://my-tenant/my-ns/my-topic")
+	assert.NotNil(t, err)
+
+	_, err = ParseTopicName("invalid://my-tenant/my-ns/my-topic-partition-xyz")
+	assert.NotNil(t, err)
+
+	_, err = ParseTopicName("my-tenant/my-ns/my-topic-partition-xyz/invalid")
+	assert.NotNil(t, err)
+
+	_, err = ParseTopicName("persistent://my-tenant")
+	assert.NotNil(t, err)
+
+	_, err = ParseTopicName("persistent://my-tenant/my-namespace")
+	assert.NotNil(t, err)
+
+	_, err = ParseTopicName("persistent://my-tenant/my-cluster/my-ns/my-topic-partition-xyz/invalid")
+	assert.NotNil(t, err)
+}
diff --git a/pulsar/impl_client.go b/pulsar/impl_client.go
new file mode 100644
index 0000000..e143601
--- /dev/null
+++ b/pulsar/impl_client.go
@@ -0,0 +1,98 @@
+package pulsar
+
+import (
+	"fmt"
+	log "github.com/sirupsen/logrus"
+	"net/url"
+	"pulsar-client-go-native/pulsar/impl"
+	pb "pulsar-client-go-native/pulsar/pulsar_proto"
+)
+
+type client struct {
+	options ClientOptions
+
+	cnxPool       impl.ConnectionPool
+	rpcClient     impl.RpcClient
+	lookupService impl.LookupService
+
+	handlers map[impl.Closable]bool
+}
+
+func newClient(options ClientOptions) (Client, error) {
+	if options.URL == "" {
+		return nil, newError(InvalidConfiguration, "URL is required for client")
+	}
+
+	url, err := url.Parse(options.URL)
+	if err != nil {
+		log.WithError(err).Error("Failed to parse service URL")
+		return nil, newError(InvalidConfiguration, "Invalid service URL")
+	}
+
+	if url.Scheme != "pulsar" {
+		return nil, newError(InvalidConfiguration, fmt.Sprintf("Invalid URL scheme '%s'", url.Scheme))
+	}
+
+	c := &client{
+		cnxPool: impl.NewConnectionPool(),
+	}
+	c.rpcClient = impl.NewRpcClient(url, c.cnxPool)
+	c.lookupService = impl.NewLookupService(c.rpcClient)
+	return c, nil
+}
+
+func (client *client) CreateProducer(options ProducerOptions) (Producer, error) {
+	producer, err := newProducer(client, options)
+	if err == nil {
+		client.handlers[producer] = true
+	}
+	return producer, err
+}
+
+func (client *client) Subscribe(options ConsumerOptions) (Consumer, error) {
+	// TODO: Implement consumer
+	return nil, nil
+}
+
+func (client *client) CreateReader(options ReaderOptions) (Reader, error) {
+	// TODO: Implement reader
+	return nil, nil
+}
+
+func (client *client) TopicPartitions(topic string) ([]string, error) {
+	topicName, err := impl.ParseTopicName(topic)
+	if err != nil {
+		return nil, err
+	}
+
+	id := client.rpcClient.NewRequestId()
+	res, err := client.rpcClient.RequestToAnyBroker(id, pb.BaseCommand_PARTITIONED_METADATA,
+		&pb.CommandPartitionedTopicMetadata{
+			RequestId: &id,
+			Topic:     &topicName.Name,
+		})
+	if err != nil {
+		return nil, err
+	}
+
+	r := res.Response.PartitionMetadataResponse
+	if r.Error != nil {
+		return nil, newError(LookupError, r.GetError().String())
+	}
+
+	partitions := make([]string, r.GetPartitions())
+	for i := 0; i < int(r.GetPartitions()); i++ {
+		partitions[i] = fmt.Sprintf("%s-partition-%d", topic, i)
+	}
+	return partitions, nil
+}
+
+func (client *client) Close() error {
+	for handler := range client.handlers {
+		if err := handler.Close(); err != nil {
+			return err
+		}
+	}
+
+	return nil
+}
diff --git a/pulsar/impl_partition_producer.go b/pulsar/impl_partition_producer.go
new file mode 100644
index 0000000..bb96a27
--- /dev/null
+++ b/pulsar/impl_partition_producer.go
@@ -0,0 +1,88 @@
+package pulsar
+
+import (
+	"context"
+	log "github.com/sirupsen/logrus"
+	"pulsar-client-go-native/pulsar/impl"
+	pb "pulsar-client-go-native/pulsar/pulsar_proto"
+	"sync"
+)
+
+type partitionProducer struct {
+	client *client
+	topic  string
+	log    *log.Entry
+	mutex  sync.Mutex
+	cond   *sync.Cond
+}
+
+func newPartitionProducer(client *client, topic string, options *ProducerOptions) (*partitionProducer, error) {
+
+	p := &partitionProducer{
+		log: log.WithField("topic", topic),
+	}
+
+	err := p.grabCnx()
+	if err != nil {
+		log.WithError(err).Errorf("Failed to create producer")
+		return nil, err
+	} else {
+		log.Info("Created producer")
+		return p, nil
+	}
+}
+
+func (p *partitionProducer) grabCnx() error {
+	lr, err := p.client.lookupService.Lookup(p.topic)
+	if err != nil {
+		p.log.WithError(err).Warn("Failed to lookup topic")
+		return err
+	}
+
+	id := p.client.rpcClient.NewRequestId()
+	p.client.rpcClient.Request(lr.LogicalAddr.Host, lr.PhysicalAddr.Host, id, pb.BaseCommand_PRODUCER, *pb.CommandProducer{
+
+	})
+
+	var cnx impl.Connection
+	cnx, err = p.client.cnxPool.GetConnection(lr.LogicalAddr.Host, lr.PhysicalAddr.Host)
+	if err != nil {
+		p.log.WithError(err).Warn("Failed to get connection")
+		return err
+	}
+
+	cnx.
+
+	return nil
+}
+
+func (p *partitionProducer) run() {
+
+}
+
+func (p *partitionProducer) Topic() string {
+	return ""
+}
+
+func (p *partitionProducer) Name() string {
+	return ""
+}
+
+func (p *partitionProducer) Send(context.Context, ProducerMessage) error {
+	return nil
+}
+
+func (p *partitionProducer) SendAsync(context.Context, ProducerMessage, func(ProducerMessage, error)) {
+}
+
+func (p *partitionProducer) LastSequenceID() int64 {
+	return -1
+}
+
+func (p *partitionProducer) Flush() error {
+	return nil
+}
+
+func (p *partitionProducer) Close() error {
+	return nil
+}
diff --git a/pulsar/impl_producer.go b/pulsar/impl_producer.go
new file mode 100644
index 0000000..f2332ea
--- /dev/null
+++ b/pulsar/impl_producer.go
@@ -0,0 +1,123 @@
+package pulsar
+
+import (
+	"context"
+	"pulsar-client-go-native/pulsar/impl"
+	"sync"
+)
+
+type producer struct {
+	topic         string
+	producers     []Producer
+	messageRouter func(ProducerMessage, TopicMetadata) int
+}
+
+func newProducer(client *client, options ProducerOptions) (*producer, error) {
+	if options.Topic == "" {
+		return nil, newError(InvalidTopicName, "Topic name is required for producer")
+	}
+
+	p := &producer{
+		topic: options.Topic,
+	}
+
+	if options.MessageRouter == nil {
+		internalRouter := impl.NewDefaultRouter(options.BatchingMaxPublishDelay)
+		p.messageRouter = func(message ProducerMessage, metadata TopicMetadata) int {
+			return internalRouter(metadata.NumPartitions())
+		}
+	}
+
+	partitions, err := client.TopicPartitions(options.Topic)
+	if err != nil {
+		return nil, err
+	}
+
+	numPartitions := len(partitions)
+	p.producers = make([]Producer, numPartitions)
+
+	type ProducerError struct {
+		partition int
+		Producer
+		error
+	}
+
+	c := make(chan ProducerError, numPartitions)
+
+	for i := 0; i < numPartitions; i++ {
+		partition := i
+		go func() {
+			prod, err := newPartitionProducer(client, &options)
+			c <- ProducerError{partition, prod, err}
+		}()
+	}
+
+	for i := 0; i < numPartitions; i++ {
+		pe := <-c
+		err = pe.error
+		p.producers[pe.partition] = pe.Producer
+	}
+
+	if err != nil {
+		// Since there were some failures, cleanup all the partitions that succeeded in creating the producers
+		for _, producer := range p.producers {
+			if producer != nil {
+				_ := producer.Close()
+			}
+		}
+		return nil, err
+	} else {
+		return p, nil
+	}
+}
+
+func (p *producer) Topic() string {
+	return p.topic
+}
+
+func (p *producer) Name() string {
+	return p.producers[0].Name()
+}
+
+func (p *producer) NumPartitions() uint32 {
+	return uint32(len(p.producers))
+}
+
+func (p *producer) Send(ctx context.Context, msg ProducerMessage) error {
+	wg := sync.WaitGroup{}
+	wg.Add(1)
+
+	var err error
+
+	p.SendAsync(ctx, msg, func(message ProducerMessage, e error) {
+		err = e
+		wg.Done()
+	})
+
+	wg.Wait()
+	return err
+}
+
+func (p *producer) SendAsync(ctx context.Context, msg ProducerMessage, callback func(ProducerMessage, error)) {
+	partition := p.messageRouter(msg, p)
+	p.producers[partition].SendAsync(ctx, msg, callback)
+}
+
+func (p *producer) LastSequenceID() int64 {
+	var maxSeq int64 = -1
+	for _, pp := range p.producers {
+		s := pp.LastSequenceID()
+		if s > maxSeq {
+			maxSeq = s
+		}
+	}
+	return maxSeq
+}
+
+func (p *producer) Flush() error {
+	return nil
+}
+
+func (p *producer) Close() error {
+	return nil
+}
diff --git a/pulsar/message.go b/pulsar/message.go
new file mode 100644
index 0000000..b829b96
--- /dev/null
+++ b/pulsar/message.go
@@ -0,0 +1,90 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package pulsar
+
+import "time"
+
+type ProducerMessage struct {
+	// Payload for the message
+	Payload []byte
+
+	// Sets the key of the message for routing policy
+	Key string
+
+	// Attach application defined properties on the message
+	Properties map[string]string
+
+	// Set the event time for a given message
+	EventTime time.Time
+
+	// Override the replication clusters for this message.
+	ReplicationClusters []string
+
+	// Set the sequence id to assign to the current message
+	SequenceID int64
+}
+
+type Message interface {
+	// Get the topic from which this message originated from
+	Topic() string
+
+	// Return the properties attached to the message.
+	// Properties are application defined key/value pairs that will be attached to the message
+	Properties() map[string]string
+
+	// Get the payload of the message
+	Payload() []byte
+
+	// Get the unique message ID associated with this message.
+	// The message id can be used to univocally refer to a message without having the keep the entire payload in memory.
+	ID() MessageID
+
+	// Get the publish time of this message. The publish time is the timestamp that a client publish the message.
+	PublishTime() time.Time
+
+	// Get the event time associated with this message. It is typically set by the applications via
+	// `ProducerMessage.EventTime`.
+	// If there isn't any event time associated with this event, it will be nil.
+	EventTime() *time.Time
+
+	// Get the key of the message, if any
+	Key() string
+}
+
+// Identifier for a particular message
+type MessageID interface {
+	// Serialize the message id into a sequence of bytes that can be stored somewhere else
+	Serialize() []byte
+}
+
+// Reconstruct a MessageID object from its serialized representation
+func DeserializeMessageID(data []byte) MessageID {
+	// TODO
+	//return deserializeMessageId(data)
+	return nil
+}
+
+var (
+	// MessageID that points to the earliest message avaialable in a topic
+	//EarliestMessage MessageID = earliestMessageID()
+
+	// MessageID that points to the latest message
+	//LatestMessage MessageID = latestMessageID()
+)
diff --git a/pulsar/producer.go b/pulsar/producer.go
new file mode 100644
index 0000000..90769cb
--- /dev/null
+++ b/pulsar/producer.go
@@ -0,0 +1,187 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package pulsar
+
+import (
+	"context"
+	"time"
+)
+
+type MessageRoutingMode int
+
+const (
+	// Publish messages across all partitions in round-robin.
+	RoundRobinDistribution MessageRoutingMode = iota
+
+	// The producer will chose one single partition and publish all the messages into that partition
+	UseSinglePartition
+
+	// Use custom message router implementation that will be called to determine the partition for a particular message.
+	CustomPartition
+)
+
+type HashingScheme int
+
+const (
+	JavaStringHash HashingScheme = iota // Java String.hashCode() equivalent
+	Murmur3_32Hash                      // Use Murmur3 hashing function
+	BoostHash                           // C++ based boost::hash
+)
+
+type CompressionType int
+
+const (
+	NoCompression CompressionType = iota
+	LZ4
+	ZLib
+	ZSTD
+)
+
+type TopicMetadata interface {
+	// Get the number of partitions for the specific topic
+	NumPartitions() uint32
+}
+
+type ProducerOptions struct {
+	// Specify the topic this producer will be publishing on.
+	// This argument is required when constructing the producer.
+	Topic string
+
+	// Specify a name for the producer
+	// If not assigned, the system will generate a globally unique name which can be access with
+	// Producer.ProducerName().
+	// When specifying a name, it is up to the user to ensure that, for a given topic, the producer name is unique
+	// across all Pulsar's clusters. Brokers will enforce that only a single producer a given name can be publishing on
+	// a topic.
+	Name string
+
+	// Attach a set of application defined properties to the producer
+	// This properties will be visible in the topic stats
+	Properties map[string]string
+
+	// Set the send timeout (default: 30 seconds)
+	// If a message is not acknowledged by the server before the sendTimeout expires, an error will be reported.
+	// Setting the timeout to -1, will set the timeout to infinity, which can be useful when using Pulsar's message
+	// deduplication feature.
+	SendTimeout time.Duration
+
+	// Set the max size of the queue holding the messages pending to receive an acknowledgment from the broker.
+	// When the queue is full, by default, all calls to Producer.send() and Producer.sendAsync() will fail
+	// unless `BlockIfQueueFull` is set to true. Use BlockIfQueueFull(boolean) to change the blocking behavior.
+	MaxPendingMessages int
+
+	// Set the number of max pending messages across all the partitions
+	// This setting will be used to lower the max pending messages for each partition
+	// `MaxPendingMessages(int)`, if the total exceeds the configured value.
+	MaxPendingMessagesAcrossPartitions int
+
+	// Set whether the `Producer.Send()` and `Producer.sendAsync()` operations should block when the outgoing
+	// message queue is full. Default is `false`. If set to `false`, send operations will immediately fail with
+	// `ProducerQueueIsFullError` when there is no space left in pending queue.
+	BlockIfQueueFull bool
+
+	// Set the message routing mode for the partitioned producer.
+	// Default routing mode is round-robin routing.
+	//
+	// This logic is applied when the application is not setting a key ProducerMessage#setKey(String) on a
+	// particular message.
+	MessageRoutingMode
+
+	// Change the `HashingScheme` used to chose the partition on where to publish a particular message.
+	// Standard hashing functions available are:
+	//
+	//  - `JavaStringHash` : Java String.hashCode() equivalent
+	//  - `Murmur3_32Hash` : Use Murmur3 hashing function.
+	// 		https://en.wikipedia.org/wiki/MurmurHash">https://en.wikipedia.org/wiki/MurmurHash
+	//  - `BoostHash`      : C++ based boost::hash
+	//
+	// Default is `JavaStringHash`.
+	HashingScheme
+
+	// Set the compression type for the producer.
+	// By default, message payloads are not compressed. Supported compression types are:
+	//  - LZ4
+	//  - ZLIB
+	//  - ZSTD
+	//
+	// Note: ZSTD is supported since Pulsar 2.3. Consumers will need to be at least at that
+	// release in order to be able to receive messages compressed with ZSTD.
+	CompressionType
+
+	// Set a custom message routing policy by passing an implementation of MessageRouter
+	// The router is a function that given a particular message and the topic metadata, returns the
+	// partition index where the message should be routed to
+	MessageRouter func(Message, TopicMetadata) int
+
+	// Control whether automatic batching of messages is enabled for the producer. Default: false [No batching]
+	//
+	// When batching is enabled, multiple calls to Producer.sendAsync can result in a single batch to be sent to the
+	// broker, leading to better throughput, especially when publishing small messages. If compression is enabled,
+	// messages will be compressed at the batch level, leading to a much better compression ratio for similar headers or
+	// contents.
+	//
+	// When enabled default batch delay is set to 1 ms and default batch size is 1000 messages
+	Batching bool
+
+	// Set the time period within which the messages sent will be batched (default: 10ms) if batch messages are
+	// enabled. If set to a non zero value, messages will be queued until this time interval or until
+	BatchingMaxPublishDelay time.Duration
+
+	// Set the maximum number of messages permitted in a batch. (default: 1000) If set to a value greater than 1,
+	// messages will be queued until this threshold is reached or batch interval has elapsed
+	BatchingMaxMessages uint
+}
+
+// The producer is used to publish messages on a topic
+type Producer interface {
+	// return the topic to which producer is publishing to
+	Topic() string
+
+	// return the producer name which could have been assigned by the system or specified by the client
+	Name() string
+
+	// Send a message
+	// This call will be blocking until is successfully acknowledged by the Pulsar broker.
+	// Example:
+	// producer.Send(ctx, pulsar.ProducerMessage{ Payload: myPayload })
+	Send(context.Context, ProducerMessage) error
+
+	// Send a message in asynchronous mode
+	// The callback will report back the message being published and
+	// the eventual error in publishing
+	SendAsync(context.Context, ProducerMessage, func(ProducerMessage, error))
+
+	// Get the last sequence id that was published by this producer.
+	// This represent either the automatically assigned or custom sequence id (set on the ProducerMessage) that
+	// was published and acknowledged by the broker.
+	// After recreating a producer with the same producer name, this will return the last message that was
+	// published in the previous producer session, or -1 if there no message was ever published.
+	// return the last sequence id published by this producer.
+	LastSequenceID() int64
+
+	// Flush all the messages buffered in the client and wait until all messages have been successfully
+	// persisted.
+	Flush() error
+
+	// Close the producer and releases resources allocated
+	// No more writes will be accepted from this producer. Waits until all pending write request are persisted. In case
+	// of errors, pending writes will not be retried.
+	Close() error
+}
diff --git a/pulsar/pulsar_proto/PulsarApi.pb.go b/pulsar/pulsar_proto/PulsarApi.pb.go
new file mode 100644
index 0000000..cb82acd
--- /dev/null
+++ b/pulsar/pulsar_proto/PulsarApi.pb.go
@@ -0,0 +1,4043 @@
+// Code generated by protoc-gen-go. DO NOT EDIT.
+// source: PulsarApi.proto
+
+package pulsar_proto
+
+import proto "github.com/golang/protobuf/proto"
+import fmt "fmt"
+import math "math"
+
+// Reference imports to suppress errors if they are not otherwise used.
+var _ = proto.Marshal
+var _ = fmt.Errorf
+var _ = math.Inf
+
+// This is a compile-time assertion to ensure that this generated file
+// is compatible with the proto package it is being compiled against.
+// A compilation error at this line likely means your copy of the
+// proto package needs to be updated.
+const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
+
+type CompressionType int32
+
+const (
+	CompressionType_NONE CompressionType = 0
+	CompressionType_LZ4  CompressionType = 1
+	CompressionType_ZLIB CompressionType = 2
+	CompressionType_ZSTD CompressionType = 3
+)
+
+var CompressionType_name = map[int32]string{
+	0: "NONE",
+	1: "LZ4",
+	2: "ZLIB",
+	3: "ZSTD",
+}
+var CompressionType_value = map[string]int32{
+	"NONE": 0,
+	"LZ4":  1,
+	"ZLIB": 2,
+	"ZSTD": 3,
+}
+
+func (x CompressionType) Enum() *CompressionType {
+	p := new(CompressionType)
+	*p = x
+	return p
+}
+func (x CompressionType) String() string {
+	return proto.EnumName(CompressionType_name, int32(x))
+}
+func (x *CompressionType) UnmarshalJSON(data []byte) error {
+	value, err := proto.UnmarshalJSONEnum(CompressionType_value, data, "CompressionType")
+	if err != nil {
+		return err
+	}
+	*x = CompressionType(value)
+	return nil
+}
+func (CompressionType) EnumDescriptor() ([]byte, []int) {
+	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{0}
+}
+
+type ServerError int32
+
+const (
+	ServerError_UnknownError        ServerError = 0
+	ServerError_MetadataError       ServerError = 1
+	ServerError_PersistenceError    ServerError = 2
+	ServerError_AuthenticationError ServerError = 3
+	ServerError_AuthorizationError  ServerError = 4
+	ServerError_ConsumerBusy        ServerError = 5
+	// other consumers are connected
+	ServerError_ServiceNotReady                       ServerError = 6
+	ServerError_ProducerBlockedQuotaExceededError     ServerError = 7
+	ServerError_ProducerBlockedQuotaExceededException ServerError = 8
+	ServerError_ChecksumError                         ServerError = 9
+	ServerError_UnsupportedVersionError               ServerError = 10
+	ServerError_TopicNotFound                         ServerError = 11
+	ServerError_SubscriptionNotFound                  ServerError = 12
+	ServerError_ConsumerNotFound                      ServerError = 13
+	ServerError_TooManyRequests                       ServerError = 14
+	ServerError_TopicTerminatedError                  ServerError = 15
+	ServerError_ProducerBusy                          ServerError = 16
+	ServerError_InvalidTopicName                      ServerError = 17
+	ServerError_IncompatibleSchema                    ServerError = 18
+)
+
+var ServerError_name = map[int32]string{
+	0:  "UnknownError",
+	1:  "MetadataError",
+	2:  "PersistenceError",
+	3:  "AuthenticationError",
+	4:  "AuthorizationError",
+	5:  "ConsumerBusy",
+	6:  "ServiceNotReady",
+	7:  "ProducerBlockedQuotaExceededError",
+	8:  "ProducerBlockedQuotaExceededException",
+	9:  "ChecksumError",
+	10: "UnsupportedVersionError",
+	11: "TopicNotFound",
+	12: "SubscriptionNotFound",
+	13: "ConsumerNotFound",
+	14: "TooManyRequests",
+	15: "TopicTerminatedError",
+	16: "ProducerBusy",
+	17: "InvalidTopicName",
+	18: "IncompatibleSchema",
+}
+var ServerError_value = map[string]int32{
+	"UnknownError":                          0,
+	"MetadataError":                         1,
+	"PersistenceError":                      2,
+	"AuthenticationError":                   3,
+	"AuthorizationError":                    4,
+	"ConsumerBusy":                          5,
+	"ServiceNotReady":                       6,
+	"ProducerBlockedQuotaExceededError":     7,
+	"ProducerBlockedQuotaExceededException": 8,
+	"ChecksumError":                         9,
+	"UnsupportedVersionError":               10,
+	"TopicNotFound":                         11,
+	"SubscriptionNotFound":                  12,
+	"ConsumerNotFound":                      13,
+	"TooManyRequests":                       14,
+	"TopicTerminatedError":                  15,
+	"ProducerBusy":                          16,
+	"InvalidTopicName":                      17,
+	"IncompatibleSchema":                    18,
+}
+
+func (x ServerError) Enum() *ServerError {
+	p := new(ServerError)
+	*p = x
+	return p
+}
+func (x ServerError) String() string {
+	return proto.EnumName(ServerError_name, int32(x))
+}
+func (x *ServerError) UnmarshalJSON(data []byte) error {
+	value, err := proto.UnmarshalJSONEnum(ServerError_value, data, "ServerError")
+	if err != nil {
+		return err
+	}
+	*x = ServerError(value)
+	return nil
+}
+func (ServerError) EnumDescriptor() ([]byte, []int) {
+	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{1}
+}
+
+type AuthMethod int32
+
+const (
+	AuthMethod_AuthMethodNone   AuthMethod = 0
+	AuthMethod_AuthMethodYcaV1  AuthMethod = 1
+	AuthMethod_AuthMethodAthens AuthMethod = 2
+)
+
+var AuthMethod_name = map[int32]string{
+	0: "AuthMethodNone",
+	1: "AuthMethodYcaV1",
+	2: "AuthMethodAthens",
+}
+var AuthMethod_value = map[string]int32{
+	"AuthMethodNone":   0,
+	"AuthMethodYcaV1":  1,
+	"AuthMethodAthens": 2,
+}
+
+func (x AuthMethod) Enum() *AuthMethod {
+	p := new(AuthMethod)
+	*p = x
+	return p
+}
+func (x AuthMethod) String() string {
+	return proto.EnumName(AuthMethod_name, int32(x))
+}
+func (x *AuthMethod) UnmarshalJSON(data []byte) error {
+	value, err := proto.UnmarshalJSONEnum(AuthMethod_value, data, "AuthMethod")
+	if err != nil {
+		return err
+	}
+	*x = AuthMethod(value)
+	return nil
+}
+func (AuthMethod) EnumDescriptor() ([]byte, []int) {
+	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{2}
+}
+
+// Each protocol version identify new features that are
+// incrementally added to the protocol
+type ProtocolVersion int32
+
+const (
+	ProtocolVersion_v0  ProtocolVersion = 0
+	ProtocolVersion_v1  ProtocolVersion = 1
+	ProtocolVersion_v2  ProtocolVersion = 2
+	ProtocolVersion_v3  ProtocolVersion = 3
+	ProtocolVersion_v4  ProtocolVersion = 4
+	ProtocolVersion_v5  ProtocolVersion = 5
+	ProtocolVersion_v6  ProtocolVersion = 6
+	ProtocolVersion_v7  ProtocolVersion = 7
+	ProtocolVersion_v8  ProtocolVersion = 8
+	ProtocolVersion_v9  ProtocolVersion = 9
+	ProtocolVersion_v10 ProtocolVersion = 10
+	ProtocolVersion_v11 ProtocolVersion = 11
+	ProtocolVersion_v12 ProtocolVersion = 12
+	// Added CommandActiveConsumerChange
+	// Added CommandGetTopicsOfNamespace
+	ProtocolVersion_v13 ProtocolVersion = 13
+)
+
+var ProtocolVersion_name = map[int32]string{
+	0:  "v0",
+	1:  "v1",
+	2:  "v2",
+	3:  "v3",
+	4:  "v4",
+	5:  "v5",
+	6:  "v6",
+	7:  "v7",
+	8:  "v8",
+	9:  "v9",
+	10: "v10",
+	11: "v11",
+	12: "v12",
+	13: "v13",
+}
+var ProtocolVersion_value = map[string]int32{
+	"v0":  0,
+	"v1":  1,
+	"v2":  2,
+	"v3":  3,
+	"v4":  4,
+	"v5":  5,
+	"v6":  6,
+	"v7":  7,
+	"v8":  8,
+	"v9":  9,
+	"v10": 10,
+	"v11": 11,
+	"v12": 12,
+	"v13": 13,
+}
+
+func (x ProtocolVersion) Enum() *ProtocolVersion {
+	p := new(ProtocolVersion)
+	*p = x
+	return p
+}
+func (x ProtocolVersion) String() string {
+	return proto.EnumName(ProtocolVersion_name, int32(x))
+}
+func (x *ProtocolVersion) UnmarshalJSON(data []byte) error {
+	value, err := proto.UnmarshalJSONEnum(ProtocolVersion_value, data, "ProtocolVersion")
+	if err != nil {
+		return err
+	}
+	*x = ProtocolVersion(value)
+	return nil
+}
+func (ProtocolVersion) EnumDescriptor() ([]byte, []int) {
+	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{3}
+}
+
+type Schema_Type int32
+
+const (
+	Schema_None     Schema_Type = 0
+	Schema_String   Schema_Type = 1
+	Schema_Json     Schema_Type = 2
+	Schema_Protobuf Schema_Type = 3
+	Schema_Avro     Schema_Type = 4
+)
+
+var Schema_Type_name = map[int32]string{
+	0: "None",
+	1: "String",
+	2: "Json",
+	3: "Protobuf",
+	4: "Avro",
+}
+var Schema_Type_value = map[string]int32{
+	"None":     0,
+	"String":   1,
+	"Json":     2,
+	"Protobuf": 3,
+	"Avro":     4,
+}
+
+func (x Schema_Type) Enum() *Schema_Type {
+	p := new(Schema_Type)
+	*p = x
+	return p
+}
+func (x Schema_Type) String() string {
+	return proto.EnumName(Schema_Type_name, int32(x))
+}
+func (x *Schema_Type) UnmarshalJSON(data []byte) error {
+	value, err := proto.UnmarshalJSONEnum(Schema_Type_value, data, "Schema_Type")
+	if err != nil {
+		return err
+	}
+	*x = Schema_Type(value)
+	return nil
+}
+func (Schema_Type) EnumDescriptor() ([]byte, []int) {
+	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{0, 0}
+}
+
+type CommandSubscribe_SubType int32
+
+const (
+	CommandSubscribe_Exclusive CommandSubscribe_SubType = 0
+	CommandSubscribe_Shared    CommandSubscribe_SubType = 1
+	CommandSubscribe_Failover  CommandSubscribe_SubType = 2
+)
+
+var CommandSubscribe_SubType_name = map[int32]string{
+	0: "Exclusive",
+	1: "Shared",
+	2: "Failover",
+}
+var CommandSubscribe_SubType_value = map[string]int32{
+	"Exclusive": 0,
+	"Shared":    1,
+	"Failover":  2,
+}
+
+func (x CommandSubscribe_SubType) Enum() *CommandSubscribe_SubType {
+	p := new(CommandSubscribe_SubType)
+	*p = x
+	return p
+}
+func (x CommandSubscribe_SubType) String() string {
+	return proto.EnumName(CommandSubscribe_SubType_name, int32(x))
+}
+func (x *CommandSubscribe_SubType) UnmarshalJSON(data []byte) error {
+	value, err := proto.UnmarshalJSONEnum(CommandSubscribe_SubType_value, data, "CommandSubscribe_SubType")
+	if err != nil {
+		return err
+	}
+	*x = CommandSubscribe_SubType(value)
+	return nil
+}
+func (CommandSubscribe_SubType) EnumDescriptor() ([]byte, []int) {
+	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{9, 0}
+}
+
+type CommandSubscribe_InitialPosition int32
+
+const (
+	CommandSubscribe_Latest   CommandSubscribe_InitialPosition = 0
+	CommandSubscribe_Earliest CommandSubscribe_InitialPosition = 1
+)
+
+var CommandSubscribe_InitialPosition_name = map[int32]string{
+	0: "Latest",
+	1: "Earliest",
+}
+var CommandSubscribe_InitialPosition_value = map[string]int32{
+	"Latest":   0,
+	"Earliest": 1,
+}
+
+func (x CommandSubscribe_InitialPosition) Enum() *CommandSubscribe_InitialPosition {
+	p := new(CommandSubscribe_InitialPosition)
+	*p = x
+	return p
+}
+func (x CommandSubscribe_InitialPosition) String() string {
+	return proto.EnumName(CommandSubscribe_InitialPosition_name, int32(x))
+}
+func (x *CommandSubscribe_InitialPosition) UnmarshalJSON(data []byte) error {
+	value, err := proto.UnmarshalJSONEnum(CommandSubscribe_InitialPosition_value, data, "CommandSubscribe_InitialPosition")
+	if err != nil {
+		return err
+	}
+	*x = CommandSubscribe_InitialPosition(value)
+	return nil
+}
+func (CommandSubscribe_InitialPosition) EnumDescriptor() ([]byte, []int) {
+	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{9, 1}
+}
+
+type CommandPartitionedTopicMetadataResponse_LookupType int32
+
+const (
+	CommandPartitionedTopicMetadataResponse_Success CommandPartitionedTopicMetadataResponse_LookupType = 0
+	CommandPartitionedTopicMetadataResponse_Failed  CommandPartitionedTopicMetadataResponse_LookupType = 1
+)
+
+var CommandPartitionedTopicMetadataResponse_LookupType_name = map[int32]string{
+	0: "Success",
+	1: "Failed",
+}
+var CommandPartitionedTopicMetadataResponse_LookupType_value = map[string]int32{
+	"Success": 0,
+	"Failed":  1,
+}
+
+func (x CommandPartitionedTopicMetadataResponse_LookupType) Enum() *CommandPartitionedTopicMetadataResponse_LookupType {
+	p := new(CommandPartitionedTopicMetadataResponse_LookupType)
+	*p = x
+	return p
+}
+func (x CommandPartitionedTopicMetadataResponse_LookupType) String() string {
+	return proto.EnumName(CommandPartitionedTopicMetadataResponse_LookupType_name, int32(x))
+}
+func (x *CommandPartitionedTopicMetadataResponse_LookupType) UnmarshalJSON(data []byte) error {
+	value, err := proto.UnmarshalJSONEnum(CommandPartitionedTopicMetadataResponse_LookupType_value, data, "CommandPartitionedTopicMetadataResponse_LookupType")
+	if err != nil {
+		return err
+	}
+	*x = CommandPartitionedTopicMetadataResponse_LookupType(value)
+	return nil
+}
+func (CommandPartitionedTopicMetadataResponse_LookupType) EnumDescriptor() ([]byte, []int) {
+	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{11, 0}
+}
+
+type CommandLookupTopicResponse_LookupType int32
+
+const (
+	CommandLookupTopicResponse_Redirect CommandLookupTopicResponse_LookupType = 0
+	CommandLookupTopicResponse_Connect  CommandLookupTopicResponse_LookupType = 1
+	CommandLookupTopicResponse_Failed   CommandLookupTopicResponse_LookupType = 2
+)
+
+var CommandLookupTopicResponse_LookupType_name = map[int32]string{
+	0: "Redirect",
+	1: "Connect",
+	2: "Failed",
+}
+var CommandLookupTopicResponse_LookupType_value = map[string]int32{
+	"Redirect": 0,
+	"Connect":  1,
+	"Failed":   2,
+}
+
+func (x CommandLookupTopicResponse_LookupType) Enum() *CommandLookupTopicResponse_LookupType {
+	p := new(CommandLookupTopicResponse_LookupType)
+	*p = x
+	return p
+}
+func (x CommandLookupTopicResponse_LookupType) String() string {
+	return proto.EnumName(CommandLookupTopicResponse_LookupType_name, int32(x))
+}
+func (x *CommandLookupTopicResponse_LookupType) UnmarshalJSON(data []byte) error {
+	value, err := proto.UnmarshalJSONEnum(CommandLookupTopicResponse_LookupType_value, data, "CommandLookupTopicResponse_LookupType")
+	if err != nil {
+		return err
+	}
+	*x = CommandLookupTopicResponse_LookupType(value)
+	return nil
+}
+func (CommandLookupTopicResponse_LookupType) EnumDescriptor() ([]byte, []int) {
+	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{13, 0}
+}
+
+type CommandAck_AckType int32
+
+const (
+	CommandAck_Individual CommandAck_AckType = 0
+	CommandAck_Cumulative CommandAck_AckType = 1
+)
+
+var CommandAck_AckType_name = map[int32]string{
+	0: "Individual",
+	1: "Cumulative",
+}
+var CommandAck_AckType_value = map[string]int32{
+	"Individual": 0,
+	"Cumulative": 1,
+}
+
+func (x CommandAck_AckType) Enum() *CommandAck_AckType {
+	p := new(CommandAck_AckType)
+	*p = x
+	return p
+}
+func (x CommandAck_AckType) String() string {
+	return proto.EnumName(CommandAck_AckType_name, int32(x))
+}
+func (x *CommandAck_AckType) UnmarshalJSON(data []byte) error {
+	value, err := proto.UnmarshalJSONEnum(CommandAck_AckType_value, data, "CommandAck_AckType")
+	if err != nil {
+		return err
+	}
+	*x = CommandAck_AckType(value)
+	return nil
+}
+func (CommandAck_AckType) EnumDescriptor() ([]byte, []int) {
+	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{19, 0}
+}
+
+// Acks can contain a flag to indicate the consumer
+// received an invalid message that got discarded
+// before being passed on to the application.
+type CommandAck_ValidationError int32
+
+const (
+	CommandAck_UncompressedSizeCorruption CommandAck_ValidationError = 0
+	CommandAck_DecompressionError         CommandAck_ValidationError = 1
+	CommandAck_ChecksumMismatch           CommandAck_ValidationError = 2
+	CommandAck_BatchDeSerializeError      CommandAck_ValidationError = 3
+	CommandAck_DecryptionError            CommandAck_ValidationError = 4
+)
+
+var CommandAck_ValidationError_name = map[int32]string{
+	0: "UncompressedSizeCorruption",
+	1: "DecompressionError",
+	2: "ChecksumMismatch",
+	3: "BatchDeSerializeError",
+	4: "DecryptionError",
+}
+var CommandAck_ValidationError_value = map[string]int32{
+	"UncompressedSizeCorruption": 0,
+	"DecompressionError":         1,
+	"ChecksumMismatch":           2,
+	"BatchDeSerializeError":      3,
+	"DecryptionError":            4,
+}
+
+func (x CommandAck_ValidationError) Enum() *CommandAck_ValidationError {
+	p := new(CommandAck_ValidationError)
+	*p = x
+	return p
+}
+func (x CommandAck_ValidationError) String() string {
+	return proto.EnumName(CommandAck_ValidationError_name, int32(x))
+}
+func (x *CommandAck_ValidationError) UnmarshalJSON(data []byte) error {
+	value, err := proto.UnmarshalJSONEnum(CommandAck_ValidationError_value, data, "CommandAck_ValidationError")
+	if err != nil {
+		return err
+	}
+	*x = CommandAck_ValidationError(value)
+	return nil
+}
+func (CommandAck_ValidationError) EnumDescriptor() ([]byte, []int) {
+	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{19, 1}
+}
+
+type CommandGetTopicsOfNamespace_Mode int32
+
+const (
+	CommandGetTopicsOfNamespace_PERSISTENT     CommandGetTopicsOfNamespace_Mode = 0
+	CommandGetTopicsOfNamespace_NON_PERSISTENT CommandGetTopicsOfNamespace_Mode = 1
+	CommandGetTopicsOfNamespace_ALL            CommandGetTopicsOfNamespace_Mode = 2
+)
+
+var CommandGetTopicsOfNamespace_Mode_name = map[int32]string{
+	0: "PERSISTENT",
+	1: "NON_PERSISTENT",
+	2: "ALL",
+}
+var CommandGetTopicsOfNamespace_Mode_value = map[string]int32{
+	"PERSISTENT":     0,
+	"NON_PERSISTENT": 1,
+	"ALL":            2,
+}
+
+func (x CommandGetTopicsOfNamespace_Mode) Enum() *CommandGetTopicsOfNamespace_Mode {
+	p := new(CommandGetTopicsOfNamespace_Mode)
+	*p = x
+	return p
+}
+func (x CommandGetTopicsOfNamespace_Mode) String() string {
+	return proto.EnumName(CommandGetTopicsOfNamespace_Mode_name, int32(x))
+}
+func (x *CommandGetTopicsOfNamespace_Mode) UnmarshalJSON(data []byte) error {
+	value, err := proto.UnmarshalJSONEnum(CommandGetTopicsOfNamespace_Mode_value, data, "CommandGetTopicsOfNamespace_Mode")
+	if err != nil {
+		return err
+	}
+	*x = CommandGetTopicsOfNamespace_Mode(value)
+	return nil
+}
+func (CommandGetTopicsOfNamespace_Mode) EnumDescriptor() ([]byte, []int) {
+	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{37, 0}
+}
+
+type BaseCommand_Type int32
+
+const (
+	BaseCommand_CONNECT                           BaseCommand_Type = 2
+	BaseCommand_CONNECTED                         BaseCommand_Type = 3
+	BaseCommand_SUBSCRIBE                         BaseCommand_Type = 4
+	BaseCommand_PRODUCER                          BaseCommand_Type = 5
+	BaseCommand_SEND                              BaseCommand_Type = 6
+	BaseCommand_SEND_RECEIPT                      BaseCommand_Type = 7
+	BaseCommand_SEND_ERROR                        BaseCommand_Type = 8
+	BaseCommand_MESSAGE                           BaseCommand_Type = 9
+	BaseCommand_ACK                               BaseCommand_Type = 10
+	BaseCommand_FLOW                              BaseCommand_Type = 11
+	BaseCommand_UNSUBSCRIBE                       BaseCommand_Type = 12
+	BaseCommand_SUCCESS                           BaseCommand_Type = 13
+	BaseCommand_ERROR                             BaseCommand_Type = 14
+	BaseCommand_CLOSE_PRODUCER                    BaseCommand_Type = 15
+	BaseCommand_CLOSE_CONSUMER                    BaseCommand_Type = 16
+	BaseCommand_PRODUCER_SUCCESS                  BaseCommand_Type = 17
+	BaseCommand_PING                              BaseCommand_Type = 18
+	BaseCommand_PONG                              BaseCommand_Type = 19
+	BaseCommand_REDELIVER_UNACKNOWLEDGED_MESSAGES BaseCommand_Type = 20
+	BaseCommand_PARTITIONED_METADATA              BaseCommand_Type = 21
+	BaseCommand_PARTITIONED_METADATA_RESPONSE     BaseCommand_Type = 22
+	BaseCommand_LOOKUP                            BaseCommand_Type = 23
+	BaseCommand_LOOKUP_RESPONSE                   BaseCommand_Type = 24
+	BaseCommand_CONSUMER_STATS                    BaseCommand_Type = 25
+	BaseCommand_CONSUMER_STATS_RESPONSE           BaseCommand_Type = 26
+	BaseCommand_REACHED_END_OF_TOPIC              BaseCommand_Type = 27
+	BaseCommand_SEEK                              BaseCommand_Type = 28
+	BaseCommand_GET_LAST_MESSAGE_ID               BaseCommand_Type = 29
+	BaseCommand_GET_LAST_MESSAGE_ID_RESPONSE      BaseCommand_Type = 30
+	BaseCommand_ACTIVE_CONSUMER_CHANGE            BaseCommand_Type = 31
+	BaseCommand_GET_TOPICS_OF_NAMESPACE           BaseCommand_Type = 32
+	BaseCommand_GET_TOPICS_OF_NAMESPACE_RESPONSE  BaseCommand_Type = 33
+	BaseCommand_GET_SCHEMA                        BaseCommand_Type = 34
+	BaseCommand_GET_SCHEMA_RESPONSE               BaseCommand_Type = 35
+)
+
+var BaseCommand_Type_name = map[int32]string{
+	2:  "CONNECT",
+	3:  "CONNECTED",
+	4:  "SUBSCRIBE",
+	5:  "PRODUCER",
+	6:  "SEND",
+	7:  "SEND_RECEIPT",
+	8:  "SEND_ERROR",
+	9:  "MESSAGE",
+	10: "ACK",
+	11: "FLOW",
+	12: "UNSUBSCRIBE",
+	13: "SUCCESS",
+	14: "ERROR",
+	15: "CLOSE_PRODUCER",
+	16: "CLOSE_CONSUMER",
+	17: "PRODUCER_SUCCESS",
+	18: "PING",
+	19: "PONG",
+	20: "REDELIVER_UNACKNOWLEDGED_MESSAGES",
+	21: "PARTITIONED_METADATA",
+	22: "PARTITIONED_METADATA_RESPONSE",
+	23: "LOOKUP",
+	24: "LOOKUP_RESPONSE",
+	25: "CONSUMER_STATS",
+	26: "CONSUMER_STATS_RESPONSE",
+	27: "REACHED_END_OF_TOPIC",
+	28: "SEEK",
+	29: "GET_LAST_MESSAGE_ID",
+	30: "GET_LAST_MESSAGE_ID_RESPONSE",
+	31: "ACTIVE_CONSUMER_CHANGE",
+	32: "GET_TOPICS_OF_NAMESPACE",
+	33: "GET_TOPICS_OF_NAMESPACE_RESPONSE",
+	34: "GET_SCHEMA",
+	35: "GET_SCHEMA_RESPONSE",
+}
+var BaseCommand_Type_value = map[string]int32{
+	"CONNECT":                           2,
+	"CONNECTED":                         3,
+	"SUBSCRIBE":                         4,
+	"PRODUCER":                          5,
+	"SEND":                              6,
+	"SEND_RECEIPT":                      7,
+	"SEND_ERROR":                        8,
+	"MESSAGE":                           9,
+	"ACK":                               10,
+	"FLOW":                              11,
+	"UNSUBSCRIBE":                       12,
+	"SUCCESS":                           13,
+	"ERROR":                             14,
+	"CLOSE_PRODUCER":                    15,
+	"CLOSE_CONSUMER":                    16,
+	"PRODUCER_SUCCESS":                  17,
+	"PING":                              18,
+	"PONG":                              19,
+	"REDELIVER_UNACKNOWLEDGED_MESSAGES": 20,
+	"PARTITIONED_METADATA":              21,
+	"PARTITIONED_METADATA_RESPONSE":     22,
+	"LOOKUP":                            23,
+	"LOOKUP_RESPONSE":                   24,
+	"CONSUMER_STATS":                    25,
+	"CONSUMER_STATS_RESPONSE":           26,
+	"REACHED_END_OF_TOPIC":              27,
+	"SEEK":                              28,
+	"GET_LAST_MESSAGE_ID":               29,
+	"GET_LAST_MESSAGE_ID_RESPONSE":      30,
+	"ACTIVE_CONSUMER_CHANGE":            31,
+	"GET_TOPICS_OF_NAMESPACE":           32,
+	"GET_TOPICS_OF_NAMESPACE_RESPONSE":  33,
+	"GET_SCHEMA":                        34,
+	"GET_SCHEMA_RESPONSE":               35,
+}
+
+func (x BaseCommand_Type) Enum() *BaseCommand_Type {
+	p := new(BaseCommand_Type)
+	*p = x
+	return p
+}
+func (x BaseCommand_Type) String() string {
+	return proto.EnumName(BaseCommand_Type_name, int32(x))
+}
+func (x *BaseCommand_Type) UnmarshalJSON(data []byte) error {
+	value, err := proto.UnmarshalJSONEnum(BaseCommand_Type_value, data, "BaseCommand_Type")
+	if err != nil {
+		return err
+	}
+	*x = BaseCommand_Type(value)
+	return nil
+}
+func (BaseCommand_Type) EnumDescriptor() ([]byte, []int) {
+	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{41, 0}
+}
+
+type Schema struct {
+	Name                 *string      `protobuf:"bytes,1,req,name=name" json:"name,omitempty"`
+	SchemaData           []byte       `protobuf:"bytes,3,req,name=schema_data,json=schemaData" json:"schema_data,omitempty"`
+	Type                 *Schema_Type `protobuf:"varint,4,req,name=type,enum=pulsar.proto.Schema_Type" json:"type,omitempty"`
+	Properties           []*KeyValue  `protobuf:"bytes,5,rep,name=properties" json:"properties,omitempty"`
+	XXX_NoUnkeyedLiteral struct{}     `json:"-"`
+	XXX_unrecognized     []byte       `json:"-"`
+	XXX_sizecache        int32        `json:"-"`
+}
+
+func (m *Schema) Reset()         { *m = Schema{} }
+func (m *Schema) String() string { return proto.CompactTextString(m) }
+func (*Schema) ProtoMessage()    {}
+func (*Schema) Descriptor() ([]byte, []int) {
+	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{0}
+}
+func (m *Schema) XXX_Unmarshal(b []byte) error {
+	return xxx_messageInfo_Schema.Unmarshal(m, b)
+}
+func (m *Schema) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+	return xxx_messageInfo_Schema.Marshal(b, m, deterministic)
+}
+func (dst *Schema) XXX_Merge(src proto.Message) {
+	xxx_messageInfo_Schema.Merge(dst, src)
+}
+func (m *Schema) XXX_Size() int {
+	return xxx_messageInfo_Schema.Size(m)
+}
+func (m *Schema) XXX_DiscardUnknown() {
+	xxx_messageInfo_Schema.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_Schema proto.InternalMessageInfo
+
+func (m *Schema) GetName() string {
+	if m != nil && m.Name != nil {
+		return *m.Name
+	}
+	return ""
+}
+
+func (m *Schema) GetSchemaData() []byte {
+	if m != nil {
+		return m.SchemaData
+	}
+	return nil
+}
+
+func (m *Schema) GetType() Schema_Type {
+	if m != nil && m.Type != nil {
+		return *m.Type
+	}
+	return Schema_None
+}
+
+func (m *Schema) GetProperties() []*KeyValue {
+	if m != nil {
+		return m.Properties
+	}
+	return nil
+}
+
+type MessageIdData struct {
+	LedgerId             *uint64  `protobuf:"varint,1,req,name=ledgerId" json:"ledgerId,omitempty"`
+	EntryId              *uint64  `protobuf:"varint,2,req,name=entryId" json:"entryId,omitempty"`
+	Partition            *int32   `protobuf:"varint,3,opt,name=partition,def=-1" json:"partition,omitempty"`
+	BatchIndex           *int32   `protobuf:"varint,4,opt,name=batch_index,json=batchIndex,def=-1" json:"batch_index,omitempty"`
+	XXX_NoUnkeyedLiteral struct{} `json:"-"`
+	XXX_unrecognized     []byte   `json:"-"`
+	XXX_sizecache        int32    `json:"-"`
+}
+
+func (m *MessageIdData) Reset()         { *m = MessageIdData{} }
+func (m *MessageIdData) String() string { return proto.CompactTextString(m) }
+func (*MessageIdData) ProtoMessage()    {}
+func (*MessageIdData) Descriptor() ([]byte, []int) {
+	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{1}
+}
+func (m *MessageIdData) XXX_Unmarshal(b []byte) error {
+	return xxx_messageInfo_MessageIdData.Unmarshal(m, b)
+}
+func (m *MessageIdData) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+	return xxx_messageInfo_MessageIdData.Marshal(b, m, deterministic)
+}
+func (dst *MessageIdData) XXX_Merge(src proto.Message) {
+	xxx_messageInfo_MessageIdData.Merge(dst, src)
+}
+func (m *MessageIdData) XXX_Size() int {
+	return xxx_messageInfo_MessageIdData.Size(m)
+}
+func (m *MessageIdData) XXX_DiscardUnknown() {
+	xxx_messageInfo_MessageIdData.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_MessageIdData proto.InternalMessageInfo
+
+const Default_MessageIdData_Partition int32 = -1
+const Default_MessageIdData_BatchIndex int32 = -1
+
+func (m *MessageIdData) GetLedgerId() uint64 {
+	if m != nil && m.LedgerId != nil {
+		return *m.LedgerId
+	}
+	return 0
+}
+
+func (m *MessageIdData) GetEntryId() uint64 {
+	if m != nil && m.EntryId != nil {
+		return *m.EntryId
+	}
+	return 0
+}
+
+func (m *MessageIdData) GetPartition() int32 {
+	if m != nil && m.Partition != nil {
+		return *m.Partition
+	}
+	return Default_MessageIdData_Partition
+}
+
+func (m *MessageIdData) GetBatchIndex() int32 {
+	if m != nil && m.BatchIndex != nil {
+		return *m.BatchIndex
+	}
+	return Default_MessageIdData_BatchIndex
+}
+
+type KeyValue struct {
+	Key                  *string  `protobuf:"bytes,1,req,name=key" json:"key,omitempty"`
+	Value                *string  `protobuf:"bytes,2,req,name=value" json:"value,omitempty"`
+	XXX_NoUnkeyedLiteral struct{} `json:"-"`
+	XXX_unrecognized     []byte   `json:"-"`
+	XXX_sizecache        int32    `json:"-"`
+}
+
+func (m *KeyValue) Reset()         { *m = KeyValue{} }
+func (m *KeyValue) String() string { return proto.CompactTextString(m) }
+func (*KeyValue) ProtoMessage()    {}
+func (*KeyValue) Descriptor() ([]byte, []int) {
+	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{2}
+}
+func (m *KeyValue) XXX_Unmarshal(b []byte) error {
+	return xxx_messageInfo_KeyValue.Unmarshal(m, b)
+}
+func (m *KeyValue) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+	return xxx_messageInfo_KeyValue.Marshal(b, m, deterministic)
+}
+func (dst *KeyValue) XXX_Merge(src proto.Message) {
+	xxx_messageInfo_KeyValue.Merge(dst, src)
+}
+func (m *KeyValue) XXX_Size() int {
+	return xxx_messageInfo_KeyValue.Size(m)
+}
+func (m *KeyValue) XXX_DiscardUnknown() {
+	xxx_messageInfo_KeyValue.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_KeyValue proto.InternalMessageInfo
+
+func (m *KeyValue) GetKey() string {
+	if m != nil && m.Key != nil {
+		return *m.Key
+	}
+	return ""
+}
+
+func (m *KeyValue) GetValue() string {
+	if m != nil && m.Value != nil {
+		return *m.Value
+	}
+	return ""
+}
+
+type KeyLongValue struct {
+	Key                  *string  `protobuf:"bytes,1,req,name=key" json:"key,omitempty"`
+	Value                *uint64  `protobuf:"varint,2,req,name=value" json:"value,omitempty"`
+	XXX_NoUnkeyedLiteral struct{} `json:"-"`
+	XXX_unrecognized     []byte   `json:"-"`
+	XXX_sizecache        int32    `json:"-"`
+}
+
+func (m *KeyLongValue) Reset()         { *m = KeyLongValue{} }
+func (m *KeyLongValue) String() string { return proto.CompactTextString(m) }
+func (*KeyLongValue) ProtoMessage()    {}
+func (*KeyLongValue) Descriptor() ([]byte, []int) {
+	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{3}
+}
+func (m *KeyLongValue) XXX_Unmarshal(b []byte) error {
+	return xxx_messageInfo_KeyLongValue.Unmarshal(m, b)
+}
+func (m *KeyLongValue) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+	return xxx_messageInfo_KeyLongValue.Marshal(b, m, deterministic)
+}
+func (dst *KeyLongValue) XXX_Merge(src proto.Message) {
+	xxx_messageInfo_KeyLongValue.Merge(dst, src)
+}
+func (m *KeyLongValue) XXX_Size() int {
+	return xxx_messageInfo_KeyLongValue.Size(m)
+}
+func (m *KeyLongValue) XXX_DiscardUnknown() {
+	xxx_messageInfo_KeyLongValue.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_KeyLongValue proto.InternalMessageInfo
+
+func (m *KeyLongValue) GetKey() string {
+	if m != nil && m.Key != nil {
+		return *m.Key
+	}
+	return ""
+}
+
+func (m *KeyLongValue) GetValue() uint64 {
+	if m != nil && m.Value != nil {
+		return *m.Value
+	}
+	return 0
+}
+
+type EncryptionKeys struct {
+	Key                  *string     `protobuf:"bytes,1,req,name=key" json:"key,omitempty"`
+	Value                []byte      `protobuf:"bytes,2,req,name=value" json:"value,omitempty"`
+	Metadata             []*KeyValue `protobuf:"bytes,3,rep,name=metadata" json:"metadata,omitempty"`
+	XXX_NoUnkeyedLiteral struct{}    `json:"-"`
+	XXX_unrecognized     []byte      `json:"-"`
+	XXX_sizecache        int32       `json:"-"`
+}
+
+func (m *EncryptionKeys) Reset()         { *m = EncryptionKeys{} }
+func (m *EncryptionKeys) String() string { return proto.CompactTextString(m) }
+func (*EncryptionKeys) ProtoMessage()    {}
+func (*EncryptionKeys) Descriptor() ([]byte, []int) {
+	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{4}
+}
+func (m *EncryptionKeys) XXX_Unmarshal(b []byte) error {
+	return xxx_messageInfo_EncryptionKeys.Unmarshal(m, b)
+}
+func (m *EncryptionKeys) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+	return xxx_messageInfo_EncryptionKeys.Marshal(b, m, deterministic)
+}
+func (dst *EncryptionKeys) XXX_Merge(src proto.Message) {
+	xxx_messageInfo_EncryptionKeys.Merge(dst, src)
+}
+func (m *EncryptionKeys) XXX_Size() int {
+	return xxx_messageInfo_EncryptionKeys.Size(m)
+}
+func (m *EncryptionKeys) XXX_DiscardUnknown() {
+	xxx_messageInfo_EncryptionKeys.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_EncryptionKeys proto.InternalMessageInfo
+
+func (m *EncryptionKeys) GetKey() string {
+	if m != nil && m.Key != nil {
+		return *m.Key
+	}
+	return ""
+}
+
+func (m *EncryptionKeys) GetValue() []byte {
+	if m != nil {
+		return m.Value
+	}
+	return nil
+}
+
+func (m *EncryptionKeys) GetMetadata() []*KeyValue {
+	if m != nil {
+		return m.Metadata
+	}
+	return nil
+}
+
+type MessageMetadata struct {
+	ProducerName *string     `protobuf:"bytes,1,req,name=producer_name,json=producerName" json:"producer_name,omitempty"`
+	SequenceId   *uint64     `protobuf:"varint,2,req,name=sequence_id,json=sequenceId" json:"sequence_id,omitempty"`
+	PublishTime  *uint64     `protobuf:"varint,3,req,name=publish_time,json=publishTime" json:"publish_time,omitempty"`
+	Properties   []*KeyValue `protobuf:"bytes,4,rep,name=properties" json:"properties,omitempty"`
+	// Property set on replicated message,
+	// includes the source cluster name
+	ReplicatedFrom *string `protobuf:"bytes,5,opt,name=replicated_from,json=replicatedFrom" json:"replicated_from,omitempty"`
+	// key to decide partition for the msg
+	PartitionKey *string `protobuf:"bytes,6,opt,name=partition_key,json=partitionKey" json:"partition_key,omitempty"`
+	// Override namespace's replication
+	ReplicateTo      []string         `protobuf:"bytes,7,rep,name=replicate_to,json=replicateTo" json:"replicate_to,omitempty"`
+	Compression      *CompressionType `protobuf:"varint,8,opt,name=compression,enum=pulsar.proto.CompressionType,def=0" json:"compression,omitempty"`
+	UncompressedSize *uint32          `protobuf:"varint,9,opt,name=uncompressed_size,json=uncompressedSize,def=0" json:"uncompressed_size,omitempty"`
+	// Removed below checksum field from Metadata as
+	// it should be part of send-command which keeps checksum of header + payload
+	// optional sfixed64 checksum = 10;
+	// differentiate single and batch message metadata
+	NumMessagesInBatch *int32 `protobuf:"varint,11,opt,name=num_messages_in_batch,json=numMessagesInBatch,def=1" json:"num_messages_in_batch,omitempty"`
+	// the timestamp that this event occurs. it is typically set by applications.
+	// if this field is omitted, `publish_time` can be used for the purpose of `event_time`.
+	EventTime *uint64 `protobuf:"varint,12,opt,name=event_time,json=eventTime,def=0" json:"event_time,omitempty"`
+	// Contains encryption key name, encrypted key and metadata to describe the key
+	EncryptionKeys []*EncryptionKeys `protobuf:"bytes,13,rep,name=encryption_keys,json=encryptionKeys" json:"encryption_keys,omitempty"`
+	// Algorithm used to encrypt data key
+	EncryptionAlgo *string `protobuf:"bytes,14,opt,name=encryption_algo,json=encryptionAlgo" json:"encryption_algo,omitempty"`
+	// Additional parameters required by encryption
+	EncryptionParam        []byte   `protobuf:"bytes,15,opt,name=encryption_param,json=encryptionParam" json:"encryption_param,omitempty"`
+	SchemaVersion          []byte   `protobuf:"bytes,16,opt,name=schema_version,json=schemaVersion" json:"schema_version,omitempty"`
+	PartitionKeyB64Encoded *bool    `protobuf:"varint,17,opt,name=partition_key_b64_encoded,json=partitionKeyB64Encoded,def=0" json:"partition_key_b64_encoded,omitempty"`
+	XXX_NoUnkeyedLiteral   struct{} `json:"-"`
+	XXX_unrecognized       []byte   `json:"-"`
+	XXX_sizecache          int32    `json:"-"`
+}
+
+func (m *MessageMetadata) Reset()         { *m = MessageMetadata{} }
+func (m *MessageMetadata) String() string { return proto.CompactTextString(m) }
+func (*MessageMetadata) ProtoMessage()    {}
+func (*MessageMetadata) Descriptor() ([]byte, []int) {
+	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{5}
+}
+func (m *MessageMetadata) XXX_Unmarshal(b []byte) error {
+	return xxx_messageInfo_MessageMetadata.Unmarshal(m, b)
+}
+func (m *MessageMetadata) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+	return xxx_messageInfo_MessageMetadata.Marshal(b, m, deterministic)
+}
+func (dst *MessageMetadata) XXX_Merge(src proto.Message) {
+	xxx_messageInfo_MessageMetadata.Merge(dst, src)
+}
+func (m *MessageMetadata) XXX_Size() int {
+	return xxx_messageInfo_MessageMetadata.Size(m)
+}
+func (m *MessageMetadata) XXX_DiscardUnknown() {
+	xxx_messageInfo_MessageMetadata.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_MessageMetadata proto.InternalMessageInfo
+
+const Default_MessageMetadata_Compression CompressionType = CompressionType_NONE
+const Default_MessageMetadata_UncompressedSize uint32 = 0
+const Default_MessageMetadata_NumMessagesInBatch int32 = 1
+const Default_MessageMetadata_EventTime uint64 = 0
+const Default_MessageMetadata_PartitionKeyB64Encoded bool = false
+
+func (m *MessageMetadata) GetProducerName() string {
+	if m != nil && m.ProducerName != nil {
+		return *m.ProducerName
+	}
+	return ""
+}
+
+func (m *MessageMetadata) GetSequenceId() uint64 {
+	if m != nil && m.SequenceId != nil {
+		return *m.SequenceId
+	}
+	return 0
+}
+
+func (m *MessageMetadata) GetPublishTime() uint64 {
+	if m != nil && m.PublishTime != nil {
+		return *m.PublishTime
+	}
+	return 0
+}
+
+func (m *MessageMetadata) GetProperties() []*KeyValue {
+	if m != nil {
+		return m.Properties
+	}
+	return nil
+}
+
+func (m *MessageMetadata) GetReplicatedFrom() string {
+	if m != nil && m.ReplicatedFrom != nil {
+		return *m.ReplicatedFrom
+	}
+	return ""
+}
+
+func (m *MessageMetadata) GetPartitionKey() string {
+	if m != nil && m.PartitionKey != nil {
+		return *m.PartitionKey
+	}
+	return ""
+}
+
+func (m *MessageMetadata) GetReplicateTo() []string {
+	if m != nil {
+		return m.ReplicateTo
+	}
+	return nil
+}
+
+func (m *MessageMetadata) GetCompression() CompressionType {
+	if m != nil && m.Compression != nil {
+		return *m.Compression
+	}
+	return Default_MessageMetadata_Compression
+}
+
+func (m *MessageMetadata) GetUncompressedSize() uint32 {
+	if m != nil && m.UncompressedSize != nil {
+		return *m.UncompressedSize
+	}
+	return Default_MessageMetadata_UncompressedSize
+}
+
+func (m *MessageMetadata) GetNumMessagesInBatch() int32 {
+	if m != nil && m.NumMessagesInBatch != nil {
+		return *m.NumMessagesInBatch
+	}
+	return Default_MessageMetadata_NumMessagesInBatch
+}
+
+func (m *MessageMetadata) GetEventTime() uint64 {
+	if m != nil && m.EventTime != nil {
+		return *m.EventTime
+	}
+	return Default_MessageMetadata_EventTime
+}
+
+func (m *MessageMetadata) GetEncryptionKeys() []*EncryptionKeys {
+	if m != nil {
+		return m.EncryptionKeys
+	}
+	return nil
+}
+
+func (m *MessageMetadata) GetEncryptionAlgo() string {
+	if m != nil && m.EncryptionAlgo != nil {
+		return *m.EncryptionAlgo
+	}
+	return ""
+}
+
+func (m *MessageMetadata) GetEncryptionParam() []byte {
+	if m != nil {
+		return m.EncryptionParam
+	}
+	return nil
+}
+
+func (m *MessageMetadata) GetSchemaVersion() []byte {
+	if m != nil {
+		return m.SchemaVersion
+	}
+	return nil
+}
+
+func (m *MessageMetadata) GetPartitionKeyB64Encoded() bool {
+	if m != nil && m.PartitionKeyB64Encoded != nil {
+		return *m.PartitionKeyB64Encoded
+	}
+	return Default_MessageMetadata_PartitionKeyB64Encoded
+}
+
+type SingleMessageMetadata struct {
+	Properties   []*KeyValue `protobuf:"bytes,1,rep,name=properties" json:"properties,omitempty"`
+	PartitionKey *string     `protobuf:"bytes,2,opt,name=partition_key,json=partitionKey" json:"partition_key,omitempty"`
+	PayloadSize  *int32      `protobuf:"varint,3,req,name=payload_size,json=payloadSize" json:"payload_size,omitempty"`
+	CompactedOut *bool       `protobuf:"varint,4,opt,name=compacted_out,json=compactedOut,def=0" json:"compacted_out,omitempty"`
+	// the timestamp that this event occurs. it is typically set by applications.
+	// if this field is omitted, `publish_time` can be used for the purpose of `event_time`.
+	EventTime              *uint64  `protobuf:"varint,5,opt,name=event_time,json=eventTime,def=0" json:"event_time,omitempty"`
+	PartitionKeyB64Encoded *bool    `protobuf:"varint,6,opt,name=partition_key_b64_encoded,json=partitionKeyB64Encoded,def=0" json:"partition_key_b64_encoded,omitempty"`
+	XXX_NoUnkeyedLiteral   struct{} `json:"-"`
+	XXX_unrecognized       []byte   `json:"-"`
+	XXX_sizecache          int32    `json:"-"`
+}
+
+func (m *SingleMessageMetadata) Reset()         { *m = SingleMessageMetadata{} }
+func (m *SingleMessageMetadata) String() string { return proto.CompactTextString(m) }
+func (*SingleMessageMetadata) ProtoMessage()    {}
+func (*SingleMessageMetadata) Descriptor() ([]byte, []int) {
+	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{6}
+}
+func (m *SingleMessageMetadata) XXX_Unmarshal(b []byte) error {
+	return xxx_messageInfo_SingleMessageMetadata.Unmarshal(m, b)
+}
+func (m *SingleMessageMetadata) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+	return xxx_messageInfo_SingleMessageMetadata.Marshal(b, m, deterministic)
+}
+func (dst *SingleMessageMetadata) XXX_Merge(src proto.Message) {
+	xxx_messageInfo_SingleMessageMetadata.Merge(dst, src)
+}
+func (m *SingleMessageMetadata) XXX_Size() int {
+	return xxx_messageInfo_SingleMessageMetadata.Size(m)
+}
+func (m *SingleMessageMetadata) XXX_DiscardUnknown() {
+	xxx_messageInfo_SingleMessageMetadata.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_SingleMessageMetadata proto.InternalMessageInfo
+
+const Default_SingleMessageMetadata_CompactedOut bool = false
+const Default_SingleMessageMetadata_EventTime uint64 = 0
+const Default_SingleMessageMetadata_PartitionKeyB64Encoded bool = false
+
+func (m *SingleMessageMetadata) GetProperties() []*KeyValue {
+	if m != nil {
+		return m.Properties
+	}
+	return nil
+}
+
+func (m *SingleMessageMetadata) GetPartitionKey() string {
+	if m != nil && m.PartitionKey != nil {
+		return *m.PartitionKey
+	}
+	return ""
+}
+
+func (m *SingleMessageMetadata) GetPayloadSize() int32 {
+	if m != nil && m.PayloadSize != nil {
+		return *m.PayloadSize
+	}
+	return 0
+}
+
+func (m *SingleMessageMetadata) GetCompactedOut() bool {
+	if m != nil && m.CompactedOut != nil {
+		return *m.CompactedOut
+	}
+	return Default_SingleMessageMetadata_CompactedOut
+}
+
+func (m *SingleMessageMetadata) GetEventTime() uint64 {
+	if m != nil && m.EventTime != nil {
+		return *m.EventTime
+	}
+	return Default_SingleMessageMetadata_EventTime
+}
+
+func (m *SingleMessageMetadata) GetPartitionKeyB64Encoded() bool {
+	if m != nil && m.PartitionKeyB64Encoded != nil {
+		return *m.PartitionKeyB64Encoded
+	}
+	return Default_SingleMessageMetadata_PartitionKeyB64Encoded
+}
+
+type CommandConnect struct {
+	ClientVersion   *string     `protobuf:"bytes,1,req,name=client_version,json=clientVersion" json:"client_version,omitempty"`
+	AuthMethod      *AuthMethod `protobuf:"varint,2,opt,name=auth_method,json=authMethod,enum=pulsar.proto.AuthMethod" json:"auth_method,omitempty"`
+	AuthMethodName  *string     `protobuf:"bytes,5,opt,name=auth_method_name,json=authMethodName" json:"auth_method_name,omitempty"`
+	AuthData        []byte      `protobuf:"bytes,3,opt,name=auth_data,json=authData" json:"auth_data,omitempty"`
+	ProtocolVersion *int32      `protobuf:"varint,4,opt,name=protocol_version,json=protocolVersion,def=0" json:"protocol_version,omitempty"`
+	// Client can ask to be proxyied to a specific broker
+	// This is only honored by a Pulsar proxy
+	ProxyToBrokerUrl *string `protobuf:"bytes,6,opt,name=proxy_to_broker_url,json=proxyToBrokerUrl" json:"proxy_to_broker_url,omitempty"`
+	// Original principal that was verified by
+	// a Pulsar proxy. In this case the auth info above
+	// will be the auth of the proxy itself
+	OriginalPrincipal *string `protobuf:"bytes,7,opt,name=original_principal,json=originalPrincipal" json:"original_principal,omitempty"`
+	// Original auth role and auth Method that was passed
+	// to the proxy. In this case the auth info above
+	// will be the auth of the proxy itself
+	OriginalAuthData     *string  `protobuf:"bytes,8,opt,name=original_auth_data,json=originalAuthData" json:"original_auth_data,omitempty"`
+	OriginalAuthMethod   *string  `protobuf:"bytes,9,opt,name=original_auth_method,json=originalAuthMethod" json:"original_auth_method,omitempty"`
+	XXX_NoUnkeyedLiteral struct{} `json:"-"`
+	XXX_unrecognized     []byte   `json:"-"`
+	XXX_sizecache        int32    `json:"-"`
+}
+
+func (m *CommandConnect) Reset()         { *m = CommandConnect{} }
+func (m *CommandConnect) String() string { return proto.CompactTextString(m) }
+func (*CommandConnect) ProtoMessage()    {}
+func (*CommandConnect) Descriptor() ([]byte, []int) {
+	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{7}
+}
+func (m *CommandConnect) XXX_Unmarshal(b []byte) error {
+	return xxx_messageInfo_CommandConnect.Unmarshal(m, b)
+}
+func (m *CommandConnect) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+	return xxx_messageInfo_CommandConnect.Marshal(b, m, deterministic)
+}
+func (dst *CommandConnect) XXX_Merge(src proto.Message) {
+	xxx_messageInfo_CommandConnect.Merge(dst, src)
+}
+func (m *CommandConnect) XXX_Size() int {
+	return xxx_messageInfo_CommandConnect.Size(m)
+}
+func (m *CommandConnect) XXX_DiscardUnknown() {
+	xxx_messageInfo_CommandConnect.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_CommandConnect proto.InternalMessageInfo
+
+const Default_CommandConnect_ProtocolVersion int32 = 0
+
+func (m *CommandConnect) GetClientVersion() string {
+	if m != nil && m.ClientVersion != nil {
+		return *m.ClientVersion
+	}
+	return ""
+}
+
+func (m *CommandConnect) GetAuthMethod() AuthMethod {
+	if m != nil && m.AuthMethod != nil {
+		return *m.AuthMethod
+	}
+	return AuthMethod_AuthMethodNone
+}
+
+func (m *CommandConnect) GetAuthMethodName() string {
+	if m != nil && m.AuthMethodName != nil {
+		return *m.AuthMethodName
+	}
+	return ""
+}
+
+func (m *CommandConnect) GetAuthData() []byte {
+	if m != nil {
+		return m.AuthData
+	}
+	return nil
+}
+
+func (m *CommandConnect) GetProtocolVersion() int32 {
+	if m != nil && m.ProtocolVersion != nil {
+		return *m.ProtocolVersion
+	}
+	return Default_CommandConnect_ProtocolVersion
+}
+
+func (m *CommandConnect) GetProxyToBrokerUrl() string {
+	if m != nil && m.ProxyToBrokerUrl != nil {
+		return *m.ProxyToBrokerUrl
+	}
+	return ""
+}
+
+func (m *CommandConnect) GetOriginalPrincipal() string {
+	if m != nil && m.OriginalPrincipal != nil {
+		return *m.OriginalPrincipal
+	}
+	return ""
+}
+
+func (m *CommandConnect) GetOriginalAuthData() string {
+	if m != nil && m.OriginalAuthData != nil {
+		return *m.OriginalAuthData
+	}
+	return ""
+}
+
+func (m *CommandConnect) GetOriginalAuthMethod() string {
+	if m != nil && m.OriginalAuthMethod != nil {
+		return *m.OriginalAuthMethod
+	}
+	return ""
+}
+
+type CommandConnected struct {
+	ServerVersion        *string  `protobuf:"bytes,1,req,name=server_version,json=serverVersion" json:"server_version,omitempty"`
+	ProtocolVersion      *int32   `protobuf:"varint,2,opt,name=protocol_version,json=protocolVersion,def=0" json:"protocol_version,omitempty"`
+	XXX_NoUnkeyedLiteral struct{} `json:"-"`
+	XXX_unrecognized     []byte   `json:"-"`
+	XXX_sizecache        int32    `json:"-"`
+}
+
+func (m *CommandConnected) Reset()         { *m = CommandConnected{} }
+func (m *CommandConnected) String() string { return proto.CompactTextString(m) }
+func (*CommandConnected) ProtoMessage()    {}
+func (*CommandConnected) Descriptor() ([]byte, []int) {
+	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{8}
+}
+func (m *CommandConnected) XXX_Unmarshal(b []byte) error {
+	return xxx_messageInfo_CommandConnected.Unmarshal(m, b)
+}
+func (m *CommandConnected) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+	return xxx_messageInfo_CommandConnected.Marshal(b, m, deterministic)
+}
+func (dst *CommandConnected) XXX_Merge(src proto.Message) {
+	xxx_messageInfo_CommandConnected.Merge(dst, src)
+}
+func (m *CommandConnected) XXX_Size() int {
+	return xxx_messageInfo_CommandConnected.Size(m)
+}
+func (m *CommandConnected) XXX_DiscardUnknown() {
+	xxx_messageInfo_CommandConnected.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_CommandConnected proto.InternalMessageInfo
+
+const Default_CommandConnected_ProtocolVersion int32 = 0
+
+func (m *CommandConnected) GetServerVersion() string {
+	if m != nil && m.ServerVersion != nil {
+		return *m.ServerVersion
+	}
+	return ""
+}
+
+func (m *CommandConnected) GetProtocolVersion() int32 {
+	if m != nil && m.ProtocolVersion != nil {
+		return *m.ProtocolVersion
+	}
+	return Default_CommandConnected_ProtocolVersion
+}
+
+type CommandSubscribe struct {
+	Topic         *string                   `protobuf:"bytes,1,req,name=topic" json:"topic,omitempty"`
+	Subscription  *string                   `protobuf:"bytes,2,req,name=subscription" json:"subscription,omitempty"`
+	SubType       *CommandSubscribe_SubType `protobuf:"varint,3,req,name=subType,enum=pulsar.proto.CommandSubscribe_SubType" json:"subType,omitempty"`
+	ConsumerId    *uint64                   `protobuf:"varint,4,req,name=consumer_id,json=consumerId" json:"consumer_id,omitempty"`
+	RequestId     *uint64                   `protobuf:"varint,5,req,name=request_id,json=requestId" json:"request_id,omitempty"`
+	ConsumerName  *string                   `protobuf:"bytes,6,opt,name=consumer_name,json=consumerName" json:"consumer_name,omitempty"`
+	PriorityLevel *int32                    `protobuf:"varint,7,opt,name=priority_level,json=priorityLevel" json:"priority_level,omitempty"`
+	// Signal wether the subscription should be backed by a
+	// durable cursor or not
+	Durable *bool `protobuf:"varint,8,opt,name=durable,def=1" json:"durable,omitempty"`
+	// If specified, the subscription will position the cursor
+	// markd-delete position  on the particular message id and
+	// will send messages from that point
+	StartMessageId *MessageIdData `protobuf:"bytes,9,opt,name=start_message_id,json=startMessageId" json:"start_message_id,omitempty"`
+	// / Add optional metadata key=value to this consumer
+	Metadata      []*KeyValue `protobuf:"bytes,10,rep,name=metadata" json:"metadata,omitempty"`
+	ReadCompacted *bool       `protobuf:"varint,11,opt,name=read_compacted,json=readCompacted" json:"read_compacted,omitempty"`
+	Schema        *Schema     `protobuf:"bytes,12,opt,name=schema" json:"schema,omitempty"`
+	// Signal wthether the subscription will initialize on latest
+	// or not -- earliest
+	InitialPosition      *CommandSubscribe_InitialPosition `protobuf:"varint,13,opt,name=initialPosition,enum=pulsar.proto.CommandSubscribe_InitialPosition,def=0" json:"initialPosition,omitempty"`
+	XXX_NoUnkeyedLiteral struct{}                          `json:"-"`
+	XXX_unrecognized     []byte                            `json:"-"`
+	XXX_sizecache        int32                             `json:"-"`
+}
+
+func (m *CommandSubscribe) Reset()         { *m = CommandSubscribe{} }
+func (m *CommandSubscribe) String() string { return proto.CompactTextString(m) }
+func (*CommandSubscribe) ProtoMessage()    {}
+func (*CommandSubscribe) Descriptor() ([]byte, []int) {
+	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{9}
+}
+func (m *CommandSubscribe) XXX_Unmarshal(b []byte) error {
+	return xxx_messageInfo_CommandSubscribe.Unmarshal(m, b)
+}
+func (m *CommandSubscribe) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+	return xxx_messageInfo_CommandSubscribe.Marshal(b, m, deterministic)
+}
+func (dst *CommandSubscribe) XXX_Merge(src proto.Message) {
+	xxx_messageInfo_CommandSubscribe.Merge(dst, src)
+}
+func (m *CommandSubscribe) XXX_Size() int {
+	return xxx_messageInfo_CommandSubscribe.Size(m)
+}
+func (m *CommandSubscribe) XXX_DiscardUnknown() {
+	xxx_messageInfo_CommandSubscribe.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_CommandSubscribe proto.InternalMessageInfo
+
+const Default_CommandSubscribe_Durable bool = true
+const Default_CommandSubscribe_InitialPosition CommandSubscribe_InitialPosition = CommandSubscribe_Latest
+
+func (m *CommandSubscribe) GetTopic() string {
+	if m != nil && m.Topic != nil {
+		return *m.Topic
+	}
+	return ""
+}
+
+func (m *CommandSubscribe) GetSubscription() string {
+	if m != nil && m.Subscription != nil {
+		return *m.Subscription
+	}
+	return ""
+}
+
+func (m *CommandSubscribe) GetSubType() CommandSubscribe_SubType {
+	if m != nil && m.SubType != nil {
+		return *m.SubType
+	}
+	return CommandSubscribe_Exclusive
+}
+
+func (m *CommandSubscribe) GetConsumerId() uint64 {
+	if m != nil && m.ConsumerId != nil {
+		return *m.ConsumerId
+	}
+	return 0
+}
+
+func (m *CommandSubscribe) GetRequestId() uint64 {
+	if m != nil && m.RequestId != nil {
+		return *m.RequestId
+	}
+	return 0
+}
+
+func (m *CommandSubscribe) GetConsumerName() string {
+	if m != nil && m.ConsumerName != nil {
+		return *m.ConsumerName
+	}
+	return ""
+}
+
+func (m *CommandSubscribe) GetPriorityLevel() int32 {
+	if m != nil && m.PriorityLevel != nil {
+		return *m.PriorityLevel
+	}
+	return 0
+}
+
+func (m *CommandSubscribe) GetDurable() bool {
+	if m != nil && m.Durable != nil {
+		return *m.Durable
+	}
+	return Default_CommandSubscribe_Durable
+}
+
+func (m *CommandSubscribe) GetStartMessageId() *MessageIdData {
+	if m != nil {
+		return m.StartMessageId
+	}
+	return nil
+}
+
+func (m *CommandSubscribe) GetMetadata() []*KeyValue {
+	if m != nil {
+		return m.Metadata
+	}
+	return nil
+}
+
+func (m *CommandSubscribe) GetReadCompacted() bool {
+	if m != nil && m.ReadCompacted != nil {
+		return *m.ReadCompacted
+	}
+	return false
+}
+
+func (m *CommandSubscribe) GetSchema() *Schema {
+	if m != nil {
+		return m.Schema
+	}
+	return nil
+}
+
+func (m *CommandSubscribe) GetInitialPosition() CommandSubscribe_InitialPosition {
+	if m != nil && m.InitialPosition != nil {
+		return *m.InitialPosition
+	}
+	return Default_CommandSubscribe_InitialPosition
+}
+
+type CommandPartitionedTopicMetadata struct {
+	Topic     *string `protobuf:"bytes,1,req,name=topic" json:"topic,omitempty"`
+	RequestId *uint64 `protobuf:"varint,2,req,name=request_id,json=requestId" json:"request_id,omitempty"`
+	// TODO - Remove original_principal, original_auth_data, original_auth_method
+	// Original principal that was verified by
+	// a Pulsar proxy.
+	OriginalPrincipal *string `protobuf:"bytes,3,opt,name=original_principal,json=originalPrincipal" json:"original_principal,omitempty"`
+	// Original auth role and auth Method that was passed
+	// to the proxy.
+	OriginalAuthData     *string  `protobuf:"bytes,4,opt,name=original_auth_data,json=originalAuthData" json:"original_auth_data,omitempty"`
+	OriginalAuthMethod   *string  `protobuf:"bytes,5,opt,name=original_auth_method,json=originalAuthMethod" json:"original_auth_method,omitempty"`
+	XXX_NoUnkeyedLiteral struct{} `json:"-"`
+	XXX_unrecognized     []byte   `json:"-"`
+	XXX_sizecache        int32    `json:"-"`
+}
+
+func (m *CommandPartitionedTopicMetadata) Reset()         { *m = CommandPartitionedTopicMetadata{} }
+func (m *CommandPartitionedTopicMetadata) String() string { return proto.CompactTextString(m) }
+func (*CommandPartitionedTopicMetadata) ProtoMessage()    {}
+func (*CommandPartitionedTopicMetadata) Descriptor() ([]byte, []int) {
+	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{10}
+}
+func (m *CommandPartitionedTopicMetadata) XXX_Unmarshal(b []byte) error {
+	return xxx_messageInfo_CommandPartitionedTopicMetadata.Unmarshal(m, b)
+}
+func (m *CommandPartitionedTopicMetadata) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+	return xxx_messageInfo_CommandPartitionedTopicMetadata.Marshal(b, m, deterministic)
+}
+func (dst *CommandPartitionedTopicMetadata) XXX_Merge(src proto.Message) {
+	xxx_messageInfo_CommandPartitionedTopicMetadata.Merge(dst, src)
+}
+func (m *CommandPartitionedTopicMetadata) XXX_Size() int {
+	return xxx_messageInfo_CommandPartitionedTopicMetadata.Size(m)
+}
+func (m *CommandPartitionedTopicMetadata) XXX_DiscardUnknown() {
+	xxx_messageInfo_CommandPartitionedTopicMetadata.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_CommandPartitionedTopicMetadata proto.InternalMessageInfo
+
+func (m *CommandPartitionedTopicMetadata) GetTopic() string {
+	if m != nil && m.Topic != nil {
+		return *m.Topic
+	}
+	return ""
+}
+
+func (m *CommandPartitionedTopicMetadata) GetRequestId() uint64 {
+	if m != nil && m.RequestId != nil {
+		return *m.RequestId
+	}
+	return 0
+}
+
+func (m *CommandPartitionedTopicMetadata) GetOriginalPrincipal() string {
+	if m != nil && m.OriginalPrincipal != nil {
+		return *m.OriginalPrincipal
+	}
+	return ""
+}
+
+func (m *CommandPartitionedTopicMetadata) GetOriginalAuthData() string {
+	if m != nil && m.OriginalAuthData != nil {
+		return *m.OriginalAuthData
+	}
+	return ""
+}
+
+func (m *CommandPartitionedTopicMetadata) GetOriginalAuthMethod() string {
+	if m != nil && m.OriginalAuthMethod != nil {
+		return *m.OriginalAuthMethod
+	}
+	return ""
+}
+
+type CommandPartitionedTopicMetadataResponse struct {
+	Partitions           *uint32                                             `protobuf:"varint,1,opt,name=partitions" json:"partitions,omitempty"`
+	RequestId            *uint64                                             `protobuf:"varint,2,req,name=request_id,json=requestId" json:"request_id,omitempty"`
+	Response             *CommandPartitionedTopicMetadataResponse_LookupType `protobuf:"varint,3,opt,name=response,enum=pulsar.proto.CommandPartitionedTopicMetadataResponse_LookupType" json:"response,omitempty"`
+	Error                *ServerError                                        `protobuf:"varint,4,opt,name=error,enum=pulsar.proto.ServerError" json:"error,omitempty"`
+	Message              *string                                             `protobuf:"bytes,5,opt,name=message" json:"message,omitempty"`
+	XXX_NoUnkeyedLiteral struct{}                                            `json:"-"`
+	XXX_unrecognized     []byte                                              `json:"-"`
+	XXX_sizecache        int32                                               `json:"-"`
+}
+
+func (m *CommandPartitionedTopicMetadataResponse) Reset() {
+	*m = CommandPartitionedTopicMetadataResponse{}
+}
+func (m *CommandPartitionedTopicMetadataResponse) String() string { return proto.CompactTextString(m) }
+func (*CommandPartitionedTopicMetadataResponse) ProtoMessage()    {}
+func (*CommandPartitionedTopicMetadataResponse) Descriptor() ([]byte, []int) {
+	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{11}
+}
+func (m *CommandPartitionedTopicMetadataResponse) XXX_Unmarshal(b []byte) error {
+	return xxx_messageInfo_CommandPartitionedTopicMetadataResponse.Unmarshal(m, b)
+}
+func (m *CommandPartitionedTopicMetadataResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+	return xxx_messageInfo_CommandPartitionedTopicMetadataResponse.Marshal(b, m, deterministic)
+}
+func (dst *CommandPartitionedTopicMetadataResponse) XXX_Merge(src proto.Message) {
+	xxx_messageInfo_CommandPartitionedTopicMetadataResponse.Merge(dst, src)
+}
+func (m *CommandPartitionedTopicMetadataResponse) XXX_Size() int {
+	return xxx_messageInfo_CommandPartitionedTopicMetadataResponse.Size(m)
+}
+func (m *CommandPartitionedTopicMetadataResponse) XXX_DiscardUnknown() {
+	xxx_messageInfo_CommandPartitionedTopicMetadataResponse.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_CommandPartitionedTopicMetadataResponse proto.InternalMessageInfo
+
+func (m *CommandPartitionedTopicMetadataResponse) GetPartitions() uint32 {
+	if m != nil && m.Partitions != nil {
+		return *m.Partitions
+	}
+	return 0
+}
+
+func (m *CommandPartitionedTopicMetadataResponse) GetRequestId() uint64 {
+	if m != nil && m.RequestId != nil {
+		return *m.RequestId
+	}
+	return 0
+}
+
+func (m *CommandPartitionedTopicMetadataResponse) GetResponse() CommandPartitionedTopicMetadataResponse_LookupType {
+	if m != nil && m.Response != nil {
+		return *m.Response
+	}
+	return CommandPartitionedTopicMetadataResponse_Success
+}
+
+func (m *CommandPartitionedTopicMetadataResponse) GetError() ServerError {
+	if m != nil && m.Error != nil {
+		return *m.Error
+	}
+	return ServerError_UnknownError
+}
+
+func (m *CommandPartitionedTopicMetadataResponse) GetMessage() string {
+	if m != nil && m.Message != nil {
+		return *m.Message
+	}
+	return ""
+}
+
+type CommandLookupTopic struct {
+	Topic         *string `protobuf:"bytes,1,req,name=topic" json:"topic,omitempty"`
+	RequestId     *uint64 `protobuf:"varint,2,req,name=request_id,json=requestId" json:"request_id,omitempty"`
+	Authoritative *bool   `protobuf:"varint,3,opt,name=authoritative,def=0" json:"authoritative,omitempty"`
+	// TODO - Remove original_principal, original_auth_data, original_auth_method
+	// Original principal that was verified by
+	// a Pulsar proxy.
+	OriginalPrincipal *string `protobuf:"bytes,4,opt,name=original_principal,json=originalPrincipal" json:"original_principal,omitempty"`
+	// Original auth role and auth Method that was passed
+	// to the proxy.
+	OriginalAuthData     *string  `protobuf:"bytes,5,opt,name=original_auth_data,json=originalAuthData" json:"original_auth_data,omitempty"`
+	OriginalAuthMethod   *string  `protobuf:"bytes,6,opt,name=original_auth_method,json=originalAuthMethod" json:"original_auth_method,omitempty"`
+	XXX_NoUnkeyedLiteral struct{} `json:"-"`
+	XXX_unrecognized     []byte   `json:"-"`
+	XXX_sizecache        int32    `json:"-"`
+}
+
+func (m *CommandLookupTopic) Reset()         { *m = CommandLookupTopic{} }
+func (m *CommandLookupTopic) String() string { return proto.CompactTextString(m) }
+func (*CommandLookupTopic) ProtoMessage()    {}
+func (*CommandLookupTopic) Descriptor() ([]byte, []int) {
+	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{12}
+}
+func (m *CommandLookupTopic) XXX_Unmarshal(b []byte) error {
+	return xxx_messageInfo_CommandLookupTopic.Unmarshal(m, b)
+}
+func (m *CommandLookupTopic) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+	return xxx_messageInfo_CommandLookupTopic.Marshal(b, m, deterministic)
+}
+func (dst *CommandLookupTopic) XXX_Merge(src proto.Message) {
+	xxx_messageInfo_CommandLookupTopic.Merge(dst, src)
+}
+func (m *CommandLookupTopic) XXX_Size() int {
+	return xxx_messageInfo_CommandLookupTopic.Size(m)
+}
+func (m *CommandLookupTopic) XXX_DiscardUnknown() {
+	xxx_messageInfo_CommandLookupTopic.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_CommandLookupTopic proto.InternalMessageInfo
+
+const Default_CommandLookupTopic_Authoritative bool = false
+
+func (m *CommandLookupTopic) GetTopic() string {
+	if m != nil && m.Topic != nil {
+		return *m.Topic
+	}
+	return ""
+}
+
+func (m *CommandLookupTopic) GetRequestId() uint64 {
+	if m != nil && m.RequestId != nil {
+		return *m.RequestId
+	}
+	return 0
+}
+
+func (m *CommandLookupTopic) GetAuthoritative() bool {
+	if m != nil && m.Authoritative != nil {
+		return *m.Authoritative
+	}
+	return Default_CommandLookupTopic_Authoritative
+}
+
+func (m *CommandLookupTopic) GetOriginalPrincipal() string {
+	if m != nil && m.OriginalPrincipal != nil {
+		return *m.OriginalPrincipal
+	}
+	return ""
+}
+
+func (m *CommandLookupTopic) GetOriginalAuthData() string {
+	if m != nil && m.OriginalAuthData != nil {
+		return *m.OriginalAuthData
+	}
+	return ""
+}
+
+func (m *CommandLookupTopic) GetOriginalAuthMethod() string {
+	if m != nil && m.OriginalAuthMethod != nil {
+		return *m.OriginalAuthMethod
+	}
+	return ""
+}
+
+type CommandLookupTopicResponse struct {
+	BrokerServiceUrl    *string                                `protobuf:"bytes,1,opt,name=brokerServiceUrl" json:"brokerServiceUrl,omitempty"`
+	BrokerServiceUrlTls *string                                `protobuf:"bytes,2,opt,name=brokerServiceUrlTls" json:"brokerServiceUrlTls,omitempty"`
+	Response            *CommandLookupTopicResponse_LookupType `protobuf:"varint,3,opt,name=response,enum=pulsar.proto.CommandLookupTopicResponse_LookupType" json:"response,omitempty"`
+	RequestId           *uint64                                `protobuf:"varint,4,req,name=request_id,json=requestId" json:"request_id,omitempty"`
+	Authoritative       *bool                                  `protobuf:"varint,5,opt,name=authoritative,def=0" json:"authoritative,omitempty"`
+	Error               *ServerError                           `protobuf:"varint,6,opt,name=error,enum=pulsar.proto.ServerError" json:"error,omitempty"`
+	Message             *string                                `protobuf:"bytes,7,opt,name=message" json:"message,omitempty"`
+	// If it's true, indicates to the client that it must
+	// always connect through the service url after the
+	// lookup has been completed.
+	ProxyThroughServiceUrl *bool    `protobuf:"varint,8,opt,name=proxy_through_service_url,json=proxyThroughServiceUrl,def=0" json:"proxy_through_service_url,omitempty"`
+	XXX_NoUnkeyedLiteral   struct{} `json:"-"`
+	XXX_unrecognized       []byte   `json:"-"`
+	XXX_sizecache          int32    `json:"-"`
+}
+
+func (m *CommandLookupTopicResponse) Reset()         { *m = CommandLookupTopicResponse{} }
+func (m *CommandLookupTopicResponse) String() string { return proto.CompactTextString(m) }
+func (*CommandLookupTopicResponse) ProtoMessage()    {}
+func (*CommandLookupTopicResponse) Descriptor() ([]byte, []int) {
+	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{13}
+}
+func (m *CommandLookupTopicResponse) XXX_Unmarshal(b []byte) error {
+	return xxx_messageInfo_CommandLookupTopicResponse.Unmarshal(m, b)
+}
+func (m *CommandLookupTopicResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+	return xxx_messageInfo_CommandLookupTopicResponse.Marshal(b, m, deterministic)
+}
+func (dst *CommandLookupTopicResponse) XXX_Merge(src proto.Message) {
+	xxx_messageInfo_CommandLookupTopicResponse.Merge(dst, src)
+}
+func (m *CommandLookupTopicResponse) XXX_Size() int {
+	return xxx_messageInfo_CommandLookupTopicResponse.Size(m)
+}
+func (m *CommandLookupTopicResponse) XXX_DiscardUnknown() {
+	xxx_messageInfo_CommandLookupTopicResponse.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_CommandLookupTopicResponse proto.InternalMessageInfo
+
+const Default_CommandLookupTopicResponse_Authoritative bool = false
+const Default_CommandLookupTopicResponse_ProxyThroughServiceUrl bool = false
+
+func (m *CommandLookupTopicResponse) GetBrokerServiceUrl() string {
+	if m != nil && m.BrokerServiceUrl != nil {
+		return *m.BrokerServiceUrl
+	}
+	return ""
+}
+
+func (m *CommandLookupTopicResponse) GetBrokerServiceUrlTls() string {
+	if m != nil && m.BrokerServiceUrlTls != nil {
+		return *m.BrokerServiceUrlTls
+	}
+	return ""
+}
+
+func (m *CommandLookupTopicResponse) GetResponse() CommandLookupTopicResponse_LookupType {
+	if m != nil && m.Response != nil {
+		return *m.Response
+	}
+	return CommandLookupTopicResponse_Redirect
+}
+
+func (m *CommandLookupTopicResponse) GetRequestId() uint64 {
+	if m != nil && m.RequestId != nil {
+		return *m.RequestId
+	}
+	return 0
+}
+
+func (m *CommandLookupTopicResponse) GetAuthoritative() bool {
+	if m != nil && m.Authoritative != nil {
+		return *m.Authoritative
+	}
+	return Default_CommandLookupTopicResponse_Authoritative
+}
+
+func (m *CommandLookupTopicResponse) GetError() ServerError {
+	if m != nil && m.Error != nil {
+		return *m.Error
+	}
+	return ServerError_UnknownError
+}
+
+func (m *CommandLookupTopicResponse) GetMessage() string {
+	if m != nil && m.Message != nil {
+		return *m.Message
+	}
+	return ""
+}
+
+func (m *CommandLookupTopicResponse) GetProxyThroughServiceUrl() bool {
+	if m != nil && m.ProxyThroughServiceUrl != nil {
+		return *m.ProxyThroughServiceUrl
+	}
+	return Default_CommandLookupTopicResponse_ProxyThroughServiceUrl
+}
+
+// / Create a new Producer on a topic, assigning the given producer_id,
+// / all messages sent with this producer_id will be persisted on the topic
+type CommandProducer struct {
+	Topic      *string `protobuf:"bytes,1,req,name=topic" json:"topic,omitempty"`
+	ProducerId *uint64 `protobuf:"varint,2,req,name=producer_id,json=producerId" json:"producer_id,omitempty"`
+	RequestId  *uint64 `protobuf:"varint,3,req,name=request_id,json=requestId" json:"request_id,omitempty"`
+	// / If a producer name is specified, the name will be used,
+	// / otherwise the broker will generate a unique name
+	ProducerName *string `protobuf:"bytes,4,opt,name=producer_name,json=producerName" json:"producer_name,omitempty"`
+	Encrypted    *bool   `protobuf:"varint,5,opt,name=encrypted,def=0" json:"encrypted,omitempty"`
+	// / Add optional metadata key=value to this producer
+	Metadata             []*KeyValue `protobuf:"bytes,6,rep,name=metadata" json:"metadata,omitempty"`
+	Schema               *Schema     `protobuf:"bytes,7,opt,name=schema" json:"schema,omitempty"`
+	XXX_NoUnkeyedLiteral struct{}    `json:"-"`
+	XXX_unrecognized     []byte      `json:"-"`
+	XXX_sizecache        int32       `json:"-"`
+}
+
+func (m *CommandProducer) Reset()         { *m = CommandProducer{} }
+func (m *CommandProducer) String() string { return proto.CompactTextString(m) }
+func (*CommandProducer) ProtoMessage()    {}
+func (*CommandProducer) Descriptor() ([]byte, []int) {
+	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{14}
+}
+func (m *CommandProducer) XXX_Unmarshal(b []byte) error {
+	return xxx_messageInfo_CommandProducer.Unmarshal(m, b)
+}
+func (m *CommandProducer) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+	return xxx_messageInfo_CommandProducer.Marshal(b, m, deterministic)
+}
+func (dst *CommandProducer) XXX_Merge(src proto.Message) {
+	xxx_messageInfo_CommandProducer.Merge(dst, src)
+}
+func (m *CommandProducer) XXX_Size() int {
+	return xxx_messageInfo_CommandProducer.Size(m)
+}
+func (m *CommandProducer) XXX_DiscardUnknown() {
+	xxx_messageInfo_CommandProducer.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_CommandProducer proto.InternalMessageInfo
+
+const Default_CommandProducer_Encrypted bool = false
+
+func (m *CommandProducer) GetTopic() string {
+	if m != nil && m.Topic != nil {
+		return *m.Topic
+	}
+	return ""
+}
+
+func (m *CommandProducer) GetProducerId() uint64 {
+	if m != nil && m.ProducerId != nil {
+		return *m.ProducerId
+	}
+	return 0
+}
+
+func (m *CommandProducer) GetRequestId() uint64 {
+	if m != nil && m.RequestId != nil {
+		return *m.RequestId
+	}
+	return 0
+}
+
+func (m *CommandProducer) GetProducerName() string {
+	if m != nil && m.ProducerName != nil {
+		return *m.ProducerName
+	}
+	return ""
+}
+
+func (m *CommandProducer) GetEncrypted() bool {
+	if m != nil && m.Encrypted != nil {
+		return *m.Encrypted
+	}
+	return Default_CommandProducer_Encrypted
+}
+
+func (m *CommandProducer) GetMetadata() []*KeyValue {
+	if m != nil {
+		return m.Metadata
+	}
+	return nil
+}
+
+func (m *CommandProducer) GetSchema() *Schema {
+	if m != nil {
+		return m.Schema
+	}
+	return nil
+}
+
+type CommandSend struct {
+	ProducerId           *uint64  `protobuf:"varint,1,req,name=producer_id,json=producerId" json:"producer_id,omitempty"`
+	SequenceId           *uint64  `protobuf:"varint,2,req,name=sequence_id,json=sequenceId" json:"sequence_id,omitempty"`
+	NumMessages          *int32   `protobuf:"varint,3,opt,name=num_messages,json=numMessages,def=1" json:"num_messages,omitempty"`
+	XXX_NoUnkeyedLiteral struct{} `json:"-"`
+	XXX_unrecognized     []byte   `json:"-"`
+	XXX_sizecache        int32    `json:"-"`
+}
+
+func (m *CommandSend) Reset()         { *m = CommandSend{} }
+func (m *CommandSend) String() string { return proto.CompactTextString(m) }
+func (*CommandSend) ProtoMessage()    {}
+func (*CommandSend) Descriptor() ([]byte, []int) {
+	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{15}
+}
+func (m *CommandSend) XXX_Unmarshal(b []byte) error {
+	return xxx_messageInfo_CommandSend.Unmarshal(m, b)
+}
+func (m *CommandSend) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+	return xxx_messageInfo_CommandSend.Marshal(b, m, deterministic)
+}
+func (dst *CommandSend) XXX_Merge(src proto.Message) {
+	xxx_messageInfo_CommandSend.Merge(dst, src)
+}
+func (m *CommandSend) XXX_Size() int {
+	return xxx_messageInfo_CommandSend.Size(m)
+}
+func (m *CommandSend) XXX_DiscardUnknown() {
+	xxx_messageInfo_CommandSend.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_CommandSend proto.InternalMessageInfo
+
+const Default_CommandSend_NumMessages int32 = 1
+
+func (m *CommandSend) GetProducerId() uint64 {
+	if m != nil && m.ProducerId != nil {
+		return *m.ProducerId
+	}
+	return 0
+}
+
+func (m *CommandSend) GetSequenceId() uint64 {
+	if m != nil && m.SequenceId != nil {
+		return *m.SequenceId
+	}
+	return 0
+}
+
+func (m *CommandSend) GetNumMessages() int32 {
+	if m != nil && m.NumMessages != nil {
+		return *m.NumMessages
+	}
+	return Default_CommandSend_NumMessages
+}
+
+type CommandSendReceipt struct {
+	ProducerId           *uint64        `protobuf:"varint,1,req,name=producer_id,json=producerId" json:"producer_id,omitempty"`
+	SequenceId           *uint64        `protobuf:"varint,2,req,name=sequence_id,json=sequenceId" json:"sequence_id,omitempty"`
+	MessageId            *MessageIdData `protobuf:"bytes,3,opt,name=message_id,json=messageId" json:"message_id,omitempty"`
+	XXX_NoUnkeyedLiteral struct{}       `json:"-"`
+	XXX_unrecognized     []byte         `json:"-"`
+	XXX_sizecache        int32          `json:"-"`
+}
+
+func (m *CommandSendReceipt) Reset()         { *m = CommandSendReceipt{} }
+func (m *CommandSendReceipt) String() string { return proto.CompactTextString(m) }
+func (*CommandSendReceipt) ProtoMessage()    {}
+func (*CommandSendReceipt) Descriptor() ([]byte, []int) {
+	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{16}
+}
+func (m *CommandSendReceipt) XXX_Unmarshal(b []byte) error {
+	return xxx_messageInfo_CommandSendReceipt.Unmarshal(m, b)
+}
+func (m *CommandSendReceipt) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+	return xxx_messageInfo_CommandSendReceipt.Marshal(b, m, deterministic)
+}
+func (dst *CommandSendReceipt) XXX_Merge(src proto.Message) {
+	xxx_messageInfo_CommandSendReceipt.Merge(dst, src)
+}
+func (m *CommandSendReceipt) XXX_Size() int {
+	return xxx_messageInfo_CommandSendReceipt.Size(m)
+}
+func (m *CommandSendReceipt) XXX_DiscardUnknown() {
+	xxx_messageInfo_CommandSendReceipt.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_CommandSendReceipt proto.InternalMessageInfo
+
+func (m *CommandSendReceipt) GetProducerId() uint64 {
+	if m != nil && m.ProducerId != nil {
+		return *m.ProducerId
+	}
+	return 0
+}
+
+func (m *CommandSendReceipt) GetSequenceId() uint64 {
+	if m != nil && m.SequenceId != nil {
+		return *m.SequenceId
+	}
+	return 0
+}
+
+func (m *CommandSendReceipt) GetMessageId() *MessageIdData {
+	if m != nil {
+		return m.MessageId
+	}
+	return nil
+}
+
+type CommandSendError struct {
+	ProducerId           *uint64      `protobuf:"varint,1,req,name=producer_id,json=producerId" json:"producer_id,omitempty"`
+	SequenceId           *uint64      `protobuf:"varint,2,req,name=sequence_id,json=sequenceId" json:"sequence_id,omitempty"`
+	Error                *ServerError `protobuf:"varint,3,req,name=error,enum=pulsar.proto.ServerError" json:"error,omitempty"`
+	Message              *string      `protobuf:"bytes,4,req,name=message" json:"message,omitempty"`
+	XXX_NoUnkeyedLiteral struct{}     `json:"-"`
+	XXX_unrecognized     []byte       `json:"-"`
+	XXX_sizecache        int32        `json:"-"`
+}
+
+func (m *CommandSendError) Reset()         { *m = CommandSendError{} }
+func (m *CommandSendError) String() string { return proto.CompactTextString(m) }
+func (*CommandSendError) ProtoMessage()    {}
+func (*CommandSendError) Descriptor() ([]byte, []int) {
+	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{17}
+}
+func (m *CommandSendError) XXX_Unmarshal(b []byte) error {
+	return xxx_messageInfo_CommandSendError.Unmarshal(m, b)
+}
+func (m *CommandSendError) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+	return xxx_messageInfo_CommandSendError.Marshal(b, m, deterministic)
+}
+func (dst *CommandSendError) XXX_Merge(src proto.Message) {
+	xxx_messageInfo_CommandSendError.Merge(dst, src)
+}
+func (m *CommandSendError) XXX_Size() int {
+	return xxx_messageInfo_CommandSendError.Size(m)
+}
+func (m *CommandSendError) XXX_DiscardUnknown() {
+	xxx_messageInfo_CommandSendError.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_CommandSendError proto.InternalMessageInfo
+
+func (m *CommandSendError) GetProducerId() uint64 {
+	if m != nil && m.ProducerId != nil {
+		return *m.ProducerId
+	}
+	return 0
+}
+
+func (m *CommandSendError) GetSequenceId() uint64 {
+	if m != nil && m.SequenceId != nil {
+		return *m.SequenceId
+	}
+	return 0
+}
+
+func (m *CommandSendError) GetError() ServerError {
+	if m != nil && m.Error != nil {
+		return *m.Error
+	}
+	return ServerError_UnknownError
+}
+
+func (m *CommandSendError) GetMessage() string {
+	if m != nil && m.Message != nil {
+		return *m.Message
+	}
+	return ""
+}
+
+type CommandMessage struct {
+	ConsumerId           *uint64        `protobuf:"varint,1,req,name=consumer_id,json=consumerId" json:"consumer_id,omitempty"`
+	MessageId            *MessageIdData `protobuf:"bytes,2,req,name=message_id,json=messageId" json:"message_id,omitempty"`
+	RedeliveryCount      *uint32        `protobuf:"varint,3,opt,name=redelivery_count,json=redeliveryCount,def=0" json:"redelivery_count,omitempty"`
+	XXX_NoUnkeyedLiteral struct{}       `json:"-"`
+	XXX_unrecognized     []byte         `json:"-"`
+	XXX_sizecache        int32          `json:"-"`
+}
+
+func (m *CommandMessage) Reset()         { *m = CommandMessage{} }
+func (m *CommandMessage) String() string { return proto.CompactTextString(m) }
+func (*CommandMessage) ProtoMessage()    {}
+func (*CommandMessage) Descriptor() ([]byte, []int) {
+	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{18}
+}
+func (m *CommandMessage) XXX_Unmarshal(b []byte) error {
+	return xxx_messageInfo_CommandMessage.Unmarshal(m, b)
+}
+func (m *CommandMessage) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+	return xxx_messageInfo_CommandMessage.Marshal(b, m, deterministic)
+}
+func (dst *CommandMessage) XXX_Merge(src proto.Message) {
+	xxx_messageInfo_CommandMessage.Merge(dst, src)
+}
+func (m *CommandMessage) XXX_Size() int {
+	return xxx_messageInfo_CommandMessage.Size(m)
+}
+func (m *CommandMessage) XXX_DiscardUnknown() {
+	xxx_messageInfo_CommandMessage.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_CommandMessage proto.InternalMessageInfo
+
+const Default_CommandMessage_RedeliveryCount uint32 = 0
+
+func (m *CommandMessage) GetConsumerId() uint64 {
+	if m != nil && m.ConsumerId != nil {
+		return *m.ConsumerId
+	}
+	return 0
+}
+
+func (m *CommandMessage) GetMessageId() *MessageIdData {
+	if m != nil {
+		return m.MessageId
+	}
+	return nil
+}
+
+func (m *CommandMessage) GetRedeliveryCount() uint32 {
+	if m != nil && m.RedeliveryCount != nil {
+		return *m.RedeliveryCount
+	}
+	return Default_CommandMessage_RedeliveryCount
+}
+
+type CommandAck struct {
+	ConsumerId *uint64             `protobuf:"varint,1,req,name=consumer_id,json=consumerId" json:"consumer_id,omitempty"`
+	AckType    *CommandAck_AckType `protobuf:"varint,2,req,name=ack_type,json=ackType,enum=pulsar.proto.CommandAck_AckType" json:"ack_type,omitempty"`
+	// In case of individual acks, the client can pass a list of message ids
+	MessageId            []*MessageIdData            `protobuf:"bytes,3,rep,name=message_id,json=messageId" json:"message_id,omitempty"`
+	ValidationError      *CommandAck_ValidationError `protobuf:"varint,4,opt,name=validation_error,json=validationError,enum=pulsar.proto.CommandAck_ValidationError" json:"validation_error,omitempty"`
+	Properties           []*KeyLongValue             `protobuf:"bytes,5,rep,name=properties" json:"properties,omitempty"`
+	XXX_NoUnkeyedLiteral struct{}                    `json:"-"`
+	XXX_unrecognized     []byte                      `json:"-"`
+	XXX_sizecache        int32                       `json:"-"`
+}
+
+func (m *CommandAck) Reset()         { *m = CommandAck{} }
+func (m *CommandAck) String() string { return proto.CompactTextString(m) }
+func (*CommandAck) ProtoMessage()    {}
+func (*CommandAck) Descriptor() ([]byte, []int) {
+	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{19}
+}
+func (m *CommandAck) XXX_Unmarshal(b []byte) error {
+	return xxx_messageInfo_CommandAck.Unmarshal(m, b)
+}
+func (m *CommandAck) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+	return xxx_messageInfo_CommandAck.Marshal(b, m, deterministic)
+}
+func (dst *CommandAck) XXX_Merge(src proto.Message) {
+	xxx_messageInfo_CommandAck.Merge(dst, src)
+}
+func (m *CommandAck) XXX_Size() int {
+	return xxx_messageInfo_CommandAck.Size(m)
+}
+func (m *CommandAck) XXX_DiscardUnknown() {
+	xxx_messageInfo_CommandAck.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_CommandAck proto.InternalMessageInfo
+
+func (m *CommandAck) GetConsumerId() uint64 {
+	if m != nil && m.ConsumerId != nil {
+		return *m.ConsumerId
+	}
+	return 0
+}
+
+func (m *CommandAck) GetAckType() CommandAck_AckType {
+	if m != nil && m.AckType != nil {
+		return *m.AckType
+	}
+	return CommandAck_Individual
+}
+
+func (m *CommandAck) GetMessageId() []*MessageIdData {
+	if m != nil {
+		return m.MessageId
+	}
+	return nil
+}
+
+func (m *CommandAck) GetValidationError() CommandAck_ValidationError {
+	if m != nil && m.ValidationError != nil {
+		return *m.ValidationError
+	}
+	return CommandAck_UncompressedSizeCorruption
+}
+
+func (m *CommandAck) GetProperties() []*KeyLongValue {
+	if m != nil {
+		return m.Properties
+	}
+	return nil
+}
+
+// changes on active consumer
+type CommandActiveConsumerChange struct {
+	ConsumerId           *uint64  `protobuf:"varint,1,req,name=consumer_id,json=consumerId" json:"consumer_id,omitempty"`
+	IsActive             *bool    `protobuf:"varint,2,opt,name=is_active,json=isActive,def=0" json:"is_active,omitempty"`
+	XXX_NoUnkeyedLiteral struct{} `json:"-"`
+	XXX_unrecognized     []byte   `json:"-"`
+	XXX_sizecache        int32    `json:"-"`
+}
+
+func (m *CommandActiveConsumerChange) Reset()         { *m = CommandActiveConsumerChange{} }
+func (m *CommandActiveConsumerChange) String() string { return proto.CompactTextString(m) }
+func (*CommandActiveConsumerChange) ProtoMessage()    {}
+func (*CommandActiveConsumerChange) Descriptor() ([]byte, []int) {
+	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{20}
+}
+func (m *CommandActiveConsumerChange) XXX_Unmarshal(b []byte) error {
+	return xxx_messageInfo_CommandActiveConsumerChange.Unmarshal(m, b)
+}
+func (m *CommandActiveConsumerChange) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+	return xxx_messageInfo_CommandActiveConsumerChange.Marshal(b, m, deterministic)
+}
+func (dst *CommandActiveConsumerChange) XXX_Merge(src proto.Message) {
+	xxx_messageInfo_CommandActiveConsumerChange.Merge(dst, src)
+}
+func (m *CommandActiveConsumerChange) XXX_Size() int {
+	return xxx_messageInfo_CommandActiveConsumerChange.Size(m)
+}
+func (m *CommandActiveConsumerChange) XXX_DiscardUnknown() {
+	xxx_messageInfo_CommandActiveConsumerChange.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_CommandActiveConsumerChange proto.InternalMessageInfo
+
+const Default_CommandActiveConsumerChange_IsActive bool = false
+
+func (m *CommandActiveConsumerChange) GetConsumerId() uint64 {
+	if m != nil && m.ConsumerId != nil {
+		return *m.ConsumerId
+	}
+	return 0
+}
+
+func (m *CommandActiveConsumerChange) GetIsActive() bool {
+	if m != nil && m.IsActive != nil {
+		return *m.IsActive
+	}
+	return Default_CommandActiveConsumerChange_IsActive
+}
+
+type CommandFlow struct {
+	ConsumerId *uint64 `protobuf:"varint,1,req,name=consumer_id,json=consumerId" json:"consumer_id,omitempty"`
+	// Max number of messages to prefetch, in addition
+	// of any number previously specified
+	MessagePermits       *uint32  `protobuf:"varint,2,req,name=messagePermits" json:"messagePermits,omitempty"`
+	XXX_NoUnkeyedLiteral struct{} `json:"-"`
+	XXX_unrecognized     []byte   `json:"-"`
+	XXX_sizecache        int32    `json:"-"`
+}
+
+func (m *CommandFlow) Reset()         { *m = CommandFlow{} }
+func (m *CommandFlow) String() string { return proto.CompactTextString(m) }
+func (*CommandFlow) ProtoMessage()    {}
+func (*CommandFlow) Descriptor() ([]byte, []int) {
+	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{21}
+}
+func (m *CommandFlow) XXX_Unmarshal(b []byte) error {
+	return xxx_messageInfo_CommandFlow.Unmarshal(m, b)
+}
+func (m *CommandFlow) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+	return xxx_messageInfo_CommandFlow.Marshal(b, m, deterministic)
+}
+func (dst *CommandFlow) XXX_Merge(src proto.Message) {
+	xxx_messageInfo_CommandFlow.Merge(dst, src)
+}
+func (m *CommandFlow) XXX_Size() int {
+	return xxx_messageInfo_CommandFlow.Size(m)
+}
+func (m *CommandFlow) XXX_DiscardUnknown() {
+	xxx_messageInfo_CommandFlow.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_CommandFlow proto.InternalMessageInfo
+
+func (m *CommandFlow) GetConsumerId() uint64 {
+	if m != nil && m.ConsumerId != nil {
+		return *m.ConsumerId
+	}
+	return 0
+}
+
+func (m *CommandFlow) GetMessagePermits() uint32 {
+	if m != nil && m.MessagePermits != nil {
+		return *m.MessagePermits
+	}
+	return 0
+}
+
+type CommandUnsubscribe struct {
+	ConsumerId           *uint64  `protobuf:"varint,1,req,name=consumer_id,json=consumerId" json:"consumer_id,omitempty"`
+	RequestId            *uint64  `protobuf:"varint,2,req,name=request_id,json=requestId" json:"request_id,omitempty"`
+	XXX_NoUnkeyedLiteral struct{} `json:"-"`
+	XXX_unrecognized     []byte   `json:"-"`
+	XXX_sizecache        int32    `json:"-"`
+}
+
+func (m *CommandUnsubscribe) Reset()         { *m = CommandUnsubscribe{} }
+func (m *CommandUnsubscribe) String() string { return proto.CompactTextString(m) }
+func (*CommandUnsubscribe) ProtoMessage()    {}
+func (*CommandUnsubscribe) Descriptor() ([]byte, []int) {
+	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{22}
+}
+func (m *CommandUnsubscribe) XXX_Unmarshal(b []byte) error {
+	return xxx_messageInfo_CommandUnsubscribe.Unmarshal(m, b)
+}
+func (m *CommandUnsubscribe) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+	return xxx_messageInfo_CommandUnsubscribe.Marshal(b, m, deterministic)
+}
+func (dst *CommandUnsubscribe) XXX_Merge(src proto.Message) {
+	xxx_messageInfo_CommandUnsubscribe.Merge(dst, src)
+}
+func (m *CommandUnsubscribe) XXX_Size() int {
+	return xxx_messageInfo_CommandUnsubscribe.Size(m)
+}
+func (m *CommandUnsubscribe) XXX_DiscardUnknown() {
+	xxx_messageInfo_CommandUnsubscribe.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_CommandUnsubscribe proto.InternalMessageInfo
+
+func (m *CommandUnsubscribe) GetConsumerId() uint64 {
+	if m != nil && m.ConsumerId != nil {
+		return *m.ConsumerId
+	}
+	return 0
+}
+
+func (m *CommandUnsubscribe) GetRequestId() uint64 {
+	if m != nil && m.RequestId != nil {
+		return *m.RequestId
+	}
+	return 0
+}
+
+// Reset an existing consumer to a particular message id
+type CommandSeek struct {
+	ConsumerId           *uint64        `protobuf:"varint,1,req,name=consumer_id,json=consumerId" json:"consumer_id,omitempty"`
+	RequestId            *uint64        `protobuf:"varint,2,req,name=request_id,json=requestId" json:"request_id,omitempty"`
+	MessageId            *MessageIdData `protobuf:"bytes,3,opt,name=message_id,json=messageId" json:"message_id,omitempty"`
+	XXX_NoUnkeyedLiteral struct{}       `json:"-"`
+	XXX_unrecognized     []byte         `json:"-"`
+	XXX_sizecache        int32          `json:"-"`
+}
+
+func (m *CommandSeek) Reset()         { *m = CommandSeek{} }
+func (m *CommandSeek) String() string { return proto.CompactTextString(m) }
+func (*CommandSeek) ProtoMessage()    {}
+func (*CommandSeek) Descriptor() ([]byte, []int) {
+	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{23}
+}
+func (m *CommandSeek) XXX_Unmarshal(b []byte) error {
+	return xxx_messageInfo_CommandSeek.Unmarshal(m, b)
+}
+func (m *CommandSeek) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+	return xxx_messageInfo_CommandSeek.Marshal(b, m, deterministic)
+}
+func (dst *CommandSeek) XXX_Merge(src proto.Message) {
+	xxx_messageInfo_CommandSeek.Merge(dst, src)
+}
+func (m *CommandSeek) XXX_Size() int {
+	return xxx_messageInfo_CommandSeek.Size(m)
+}
+func (m *CommandSeek) XXX_DiscardUnknown() {
+	xxx_messageInfo_CommandSeek.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_CommandSeek proto.InternalMessageInfo
+
+func (m *CommandSeek) GetConsumerId() uint64 {
+	if m != nil && m.ConsumerId != nil {
+		return *m.ConsumerId
+	}
+	return 0
+}
+
+func (m *CommandSeek) GetRequestId() uint64 {
+	if m != nil && m.RequestId != nil {
+		return *m.RequestId
+	}
+	return 0
+}
+
+func (m *CommandSeek) GetMessageId() *MessageIdData {
+	if m != nil {
+		return m.MessageId
+	}
+	return nil
+}
+
+// Message sent by broker to client when a topic
+// has been forcefully terminated and there are no more
+// messages left to consume
+type CommandReachedEndOfTopic struct {
+	ConsumerId           *uint64  `protobuf:"varint,1,req,name=consumer_id,json=consumerId" json:"consumer_id,omitempty"`
+	XXX_NoUnkeyedLiteral struct{} `json:"-"`
+	XXX_unrecognized     []byte   `json:"-"`
+	XXX_sizecache        int32    `json:"-"`
+}
+
+func (m *CommandReachedEndOfTopic) Reset()         { *m = CommandReachedEndOfTopic{} }
+func (m *CommandReachedEndOfTopic) String() string { return proto.CompactTextString(m) }
+func (*CommandReachedEndOfTopic) ProtoMessage()    {}
+func (*CommandReachedEndOfTopic) Descriptor() ([]byte, []int) {
+	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{24}
+}
+func (m *CommandReachedEndOfTopic) XXX_Unmarshal(b []byte) error {
+	return xxx_messageInfo_CommandReachedEndOfTopic.Unmarshal(m, b)
+}
+func (m *CommandReachedEndOfTopic) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+	return xxx_messageInfo_CommandReachedEndOfTopic.Marshal(b, m, deterministic)
+}
+func (dst *CommandReachedEndOfTopic) XXX_Merge(src proto.Message) {
+	xxx_messageInfo_CommandReachedEndOfTopic.Merge(dst, src)
+}
+func (m *CommandReachedEndOfTopic) XXX_Size() int {
+	return xxx_messageInfo_CommandReachedEndOfTopic.Size(m)
+}
+func (m *CommandReachedEndOfTopic) XXX_DiscardUnknown() {
+	xxx_messageInfo_CommandReachedEndOfTopic.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_CommandReachedEndOfTopic proto.InternalMessageInfo
+
+func (m *CommandReachedEndOfTopic) GetConsumerId() uint64 {
+	if m != nil && m.ConsumerId != nil {
+		return *m.ConsumerId
+	}
+	return 0
+}
+
+type CommandCloseProducer struct {
+	ProducerId           *uint64  `protobuf:"varint,1,req,name=producer_id,json=producerId" json:"producer_id,omitempty"`
+	RequestId            *uint64  `protobuf:"varint,2,req,name=request_id,json=requestId" json:"request_id,omitempty"`
+	XXX_NoUnkeyedLiteral struct{} `json:"-"`
+	XXX_unrecognized     []byte   `json:"-"`
+	XXX_sizecache        int32    `json:"-"`
+}
+
+func (m *CommandCloseProducer) Reset()         { *m = CommandCloseProducer{} }
+func (m *CommandCloseProducer) String() string { return proto.CompactTextString(m) }
+func (*CommandCloseProducer) ProtoMessage()    {}
+func (*CommandCloseProducer) Descriptor() ([]byte, []int) {
+	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{25}
+}
+func (m *CommandCloseProducer) XXX_Unmarshal(b []byte) error {
+	return xxx_messageInfo_CommandCloseProducer.Unmarshal(m, b)
+}
+func (m *CommandCloseProducer) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+	return xxx_messageInfo_CommandCloseProducer.Marshal(b, m, deterministic)
+}
+func (dst *CommandCloseProducer) XXX_Merge(src proto.Message) {
+	xxx_messageInfo_CommandCloseProducer.Merge(dst, src)
+}
+func (m *CommandCloseProducer) XXX_Size() int {
+	return xxx_messageInfo_CommandCloseProducer.Size(m)
+}
+func (m *CommandCloseProducer) XXX_DiscardUnknown() {
+	xxx_messageInfo_CommandCloseProducer.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_CommandCloseProducer proto.InternalMessageInfo
+
+func (m *CommandCloseProducer) GetProducerId() uint64 {
+	if m != nil && m.ProducerId != nil {
+		return *m.ProducerId
+	}
+	return 0
+}
+
+func (m *CommandCloseProducer) GetRequestId() uint64 {
+	if m != nil && m.RequestId != nil {
+		return *m.RequestId
+	}
+	return 0
+}
+
+type CommandCloseConsumer struct {
+	ConsumerId           *uint64  `protobuf:"varint,1,req,name=consumer_id,json=consumerId" json:"consumer_id,omitempty"`
+	RequestId            *uint64  `protobuf:"varint,2,req,name=request_id,json=requestId" json:"request_id,omitempty"`
+	XXX_NoUnkeyedLiteral struct{} `json:"-"`
+	XXX_unrecognized     []byte   `json:"-"`
+	XXX_sizecache        int32    `json:"-"`
+}
+
+func (m *CommandCloseConsumer) Reset()         { *m = CommandCloseConsumer{} }
+func (m *CommandCloseConsumer) String() string { return proto.CompactTextString(m) }
+func (*CommandCloseConsumer) ProtoMessage()    {}
+func (*CommandCloseConsumer) Descriptor() ([]byte, []int) {
+	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{26}
+}
+func (m *CommandCloseConsumer) XXX_Unmarshal(b []byte) error {
+	return xxx_messageInfo_CommandCloseConsumer.Unmarshal(m, b)
+}
+func (m *CommandCloseConsumer) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+	return xxx_messageInfo_CommandCloseConsumer.Marshal(b, m, deterministic)
+}
+func (dst *CommandCloseConsumer) XXX_Merge(src proto.Message) {
+	xxx_messageInfo_CommandCloseConsumer.Merge(dst, src)
+}
+func (m *CommandCloseConsumer) XXX_Size() int {
+	return xxx_messageInfo_CommandCloseConsumer.Size(m)
+}
+func (m *CommandCloseConsumer) XXX_DiscardUnknown() {
+	xxx_messageInfo_CommandCloseConsumer.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_CommandCloseConsumer proto.InternalMessageInfo
+
+func (m *CommandCloseConsumer) GetConsumerId() uint64 {
+	if m != nil && m.ConsumerId != nil {
+		return *m.ConsumerId
+	}
+	return 0
+}
+
+func (m *CommandCloseConsumer) GetRequestId() uint64 {
+	if m != nil && m.RequestId != nil {
+		return *m.RequestId
+	}
+	return 0
+}
+
+type CommandRedeliverUnacknowledgedMessages struct {
+	ConsumerId           *uint64          `protobuf:"varint,1,req,name=consumer_id,json=consumerId" json:"consumer_id,omitempty"`
+	MessageIds           []*MessageIdData `protobuf:"bytes,2,rep,name=message_ids,json=messageIds" json:"message_ids,omitempty"`
+	XXX_NoUnkeyedLiteral struct{}         `json:"-"`
+	XXX_unrecognized     []byte           `json:"-"`
+	XXX_sizecache        int32            `json:"-"`
+}
+
+func (m *CommandRedeliverUnacknowledgedMessages) Reset() {
+	*m = CommandRedeliverUnacknowledgedMessages{}
+}
+func (m *CommandRedeliverUnacknowledgedMessages) String() string { return proto.CompactTextString(m) }
+func (*CommandRedeliverUnacknowledgedMessages) ProtoMessage()    {}
+func (*CommandRedeliverUnacknowledgedMessages) Descriptor() ([]byte, []int) {
+	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{27}
+}
+func (m *CommandRedeliverUnacknowledgedMessages) XXX_Unmarshal(b []byte) error {
+	return xxx_messageInfo_CommandRedeliverUnacknowledgedMessages.Unmarshal(m, b)
+}
+func (m *CommandRedeliverUnacknowledgedMessages) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+	return xxx_messageInfo_CommandRedeliverUnacknowledgedMessages.Marshal(b, m, deterministic)
+}
+func (dst *CommandRedeliverUnacknowledgedMessages) XXX_Merge(src proto.Message) {
+	xxx_messageInfo_CommandRedeliverUnacknowledgedMessages.Merge(dst, src)
+}
+func (m *CommandRedeliverUnacknowledgedMessages) XXX_Size() int {
+	return xxx_messageInfo_CommandRedeliverUnacknowledgedMessages.Size(m)
+}
+func (m *CommandRedeliverUnacknowledgedMessages) XXX_DiscardUnknown() {
+	xxx_messageInfo_CommandRedeliverUnacknowledgedMessages.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_CommandRedeliverUnacknowledgedMessages proto.InternalMessageInfo
+
+func (m *CommandRedeliverUnacknowledgedMessages) GetConsumerId() uint64 {
+	if m != nil && m.ConsumerId != nil {
+		return *m.ConsumerId
+	}
+	return 0
+}
+
+func (m *CommandRedeliverUnacknowledgedMessages) GetMessageIds() []*MessageIdData {
+	if m != nil {
+		return m.MessageIds
+	}
+	return nil
+}
+
+type CommandSuccess struct {
+	RequestId            *uint64  `protobuf:"varint,1,req,name=request_id,json=requestId" json:"request_id,omitempty"`
+	Schema               *Schema  `protobuf:"bytes,2,opt,name=schema" json:"schema,omitempty"`
+	XXX_NoUnkeyedLiteral struct{} `json:"-"`
+	XXX_unrecognized     []byte   `json:"-"`
+	XXX_sizecache        int32    `json:"-"`
+}
+
+func (m *CommandSuccess) Reset()         { *m = CommandSuccess{} }
+func (m *CommandSuccess) String() string { return proto.CompactTextString(m) }
+func (*CommandSuccess) ProtoMessage()    {}
+func (*CommandSuccess) Descriptor() ([]byte, []int) {
+	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{28}
+}
+func (m *CommandSuccess) XXX_Unmarshal(b []byte) error {
+	return xxx_messageInfo_CommandSuccess.Unmarshal(m, b)
+}
+func (m *CommandSuccess) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+	return xxx_messageInfo_CommandSuccess.Marshal(b, m, deterministic)
+}
+func (dst *CommandSuccess) XXX_Merge(src proto.Message) {
+	xxx_messageInfo_CommandSuccess.Merge(dst, src)
+}
+func (m *CommandSuccess) XXX_Size() int {
+	return xxx_messageInfo_CommandSuccess.Size(m)
+}
+func (m *CommandSuccess) XXX_DiscardUnknown() {
+	xxx_messageInfo_CommandSuccess.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_CommandSuccess proto.InternalMessageInfo
+
+func (m *CommandSuccess) GetRequestId() uint64 {
+	if m != nil && m.RequestId != nil {
+		return *m.RequestId
+	}
+	return 0
+}
+
+func (m *CommandSuccess) GetSchema() *Schema {
+	if m != nil {
+		return m.Schema
+	}
+	return nil
+}
+
+// / Response from CommandProducer
+type CommandProducerSuccess struct {
+	RequestId    *uint64 `protobuf:"varint,1,req,name=request_id,json=requestId" json:"request_id,omitempty"`
+	ProducerName *string `protobuf:"bytes,2,req,name=producer_name,json=producerName" json:"producer_name,omitempty"`
+	// The last sequence id that was stored by this producer in the previous session
+	// This will only be meaningful if deduplication has been enabled.
+	LastSequenceId       *int64   `protobuf:"varint,3,opt,name=last_sequence_id,json=lastSequenceId,def=-1" json:"last_sequence_id,omitempty"`
+	SchemaVersion        []byte   `protobuf:"bytes,4,opt,name=schema_version,json=schemaVersion" json:"schema_version,omitempty"`
+	XXX_NoUnkeyedLiteral struct{} `json:"-"`
+	XXX_unrecognized     []byte   `json:"-"`
+	XXX_sizecache        int32    `json:"-"`
+}
+
+func (m *CommandProducerSuccess) Reset()         { *m = CommandProducerSuccess{} }
+func (m *CommandProducerSuccess) String() string { return proto.CompactTextString(m) }
+func (*CommandProducerSuccess) ProtoMessage()    {}
+func (*CommandProducerSuccess) Descriptor() ([]byte, []int) {
+	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{29}
+}
+func (m *CommandProducerSuccess) XXX_Unmarshal(b []byte) error {
+	return xxx_messageInfo_CommandProducerSuccess.Unmarshal(m, b)
+}
+func (m *CommandProducerSuccess) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+	return xxx_messageInfo_CommandProducerSuccess.Marshal(b, m, deterministic)
+}
+func (dst *CommandProducerSuccess) XXX_Merge(src proto.Message) {
+	xxx_messageInfo_CommandProducerSuccess.Merge(dst, src)
+}
+func (m *CommandProducerSuccess) XXX_Size() int {
+	return xxx_messageInfo_CommandProducerSuccess.Size(m)
+}
+func (m *CommandProducerSuccess) XXX_DiscardUnknown() {
+	xxx_messageInfo_CommandProducerSuccess.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_CommandProducerSuccess proto.InternalMessageInfo
+
+const Default_CommandProducerSuccess_LastSequenceId int64 = -1
+
+func (m *CommandProducerSuccess) GetRequestId() uint64 {
+	if m != nil && m.RequestId != nil {
+		return *m.RequestId
+	}
+	return 0
+}
+
+func (m *CommandProducerSuccess) GetProducerName() string {
+	if m != nil && m.ProducerName != nil {
+		return *m.ProducerName
+	}
+	return ""
+}
+
+func (m *CommandProducerSuccess) GetLastSequenceId() int64 {
+	if m != nil && m.LastSequenceId != nil {
+		return *m.LastSequenceId
+	}
+	return Default_CommandProducerSuccess_LastSequenceId
+}
+
+func (m *CommandProducerSuccess) GetSchemaVersion() []byte {
+	if m != nil {
+		return m.SchemaVersion
+	}
+	return nil
+}
+
+type CommandError struct {
+	RequestId            *uint64      `protobuf:"varint,1,req,name=request_id,json=requestId" json:"request_id,omitempty"`
+	Error                *ServerError `protobuf:"varint,2,req,name=error,enum=pulsar.proto.ServerError" json:"error,omitempty"`
+	Message              *string      `protobuf:"bytes,3,req,name=message" json:"message,omitempty"`
+	XXX_NoUnkeyedLiteral struct{}     `json:"-"`
+	XXX_unrecognized     []byte       `json:"-"`
+	XXX_sizecache        int32        `json:"-"`
+}
+
+func (m *CommandError) Reset()         { *m = CommandError{} }
+func (m *CommandError) String() string { return proto.CompactTextString(m) }
+func (*CommandError) ProtoMessage()    {}
+func (*CommandError) Descriptor() ([]byte, []int) {
+	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{30}
+}
+func (m *CommandError) XXX_Unmarshal(b []byte) error {
+	return xxx_messageInfo_CommandError.Unmarshal(m, b)
+}
+func (m *CommandError) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+	return xxx_messageInfo_CommandError.Marshal(b, m, deterministic)
+}
+func (dst *CommandError) XXX_Merge(src proto.Message) {
+	xxx_messageInfo_CommandError.Merge(dst, src)
+}
+func (m *CommandError) XXX_Size() int {
+	return xxx_messageInfo_CommandError.Size(m)
+}
+func (m *CommandError) XXX_DiscardUnknown() {
+	xxx_messageInfo_CommandError.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_CommandError proto.InternalMessageInfo
+
+func (m *CommandError) GetRequestId() uint64 {
+	if m != nil && m.RequestId != nil {
+		return *m.RequestId
+	}
+	return 0
+}
+
+func (m *CommandError) GetError() ServerError {
+	if m != nil && m.Error != nil {
+		return *m.Error
+	}
+	return ServerError_UnknownError
+}
+
+func (m *CommandError) GetMessage() string {
+	if m != nil && m.Message != nil {
+		return *m.Message
+	}
+	return ""
+}
+
+// Commands to probe the state of connection.
+// When either client or broker doesn't receive commands for certain
+// amount of time, they will send a Ping probe.
+type CommandPing struct {
+	XXX_NoUnkeyedLiteral struct{} `json:"-"`
+	XXX_unrecognized     []byte   `json:"-"`
+	XXX_sizecache        int32    `json:"-"`
+}
+
+func (m *CommandPing) Reset()         { *m = CommandPing{} }
+func (m *CommandPing) String() string { return proto.CompactTextString(m) }
+func (*CommandPing) ProtoMessage()    {}
+func (*CommandPing) Descriptor() ([]byte, []int) {
+	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{31}
+}
+func (m *CommandPing) XXX_Unmarshal(b []byte) error {
+	return xxx_messageInfo_CommandPing.Unmarshal(m, b)
+}
+func (m *CommandPing) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+	return xxx_messageInfo_CommandPing.Marshal(b, m, deterministic)
+}
+func (dst *CommandPing) XXX_Merge(src proto.Message) {
+	xxx_messageInfo_CommandPing.Merge(dst, src)
+}
+func (m *CommandPing) XXX_Size() int {
+	return xxx_messageInfo_CommandPing.Size(m)
+}
+func (m *CommandPing) XXX_DiscardUnknown() {
+	xxx_messageInfo_CommandPing.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_CommandPing proto.InternalMessageInfo
+
+type CommandPong struct {
+	XXX_NoUnkeyedLiteral struct{} `json:"-"`
+	XXX_unrecognized     []byte   `json:"-"`
+	XXX_sizecache        int32    `json:"-"`
+}
+
+func (m *CommandPong) Reset()         { *m = CommandPong{} }
+func (m *CommandPong) String() string { return proto.CompactTextString(m) }
+func (*CommandPong) ProtoMessage()    {}
+func (*CommandPong) Descriptor() ([]byte, []int) {
+	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{32}
+}
+func (m *CommandPong) XXX_Unmarshal(b []byte) error {
+	return xxx_messageInfo_CommandPong.Unmarshal(m, b)
+}
+func (m *CommandPong) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+	return xxx_messageInfo_CommandPong.Marshal(b, m, deterministic)
+}
+func (dst *CommandPong) XXX_Merge(src proto.Message) {
+	xxx_messageInfo_CommandPong.Merge(dst, src)
+}
+func (m *CommandPong) XXX_Size() int {
+	return xxx_messageInfo_CommandPong.Size(m)
+}
+func (m *CommandPong) XXX_DiscardUnknown() {
+	xxx_messageInfo_CommandPong.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_CommandPong proto.InternalMessageInfo
+
+type CommandConsumerStats struct {
+	RequestId *uint64 `protobuf:"varint,1,req,name=request_id,json=requestId" json:"request_id,omitempty"`
+	// required string topic_name         = 2;
+	// required string subscription_name  = 3;
+	ConsumerId           *uint64  `protobuf:"varint,4,req,name=consumer_id,json=consumerId" json:"consumer_id,omitempty"`
+	XXX_NoUnkeyedLiteral struct{} `json:"-"`
+	XXX_unrecognized     []byte   `json:"-"`
+	XXX_sizecache        int32    `json:"-"`
+}
+
+func (m *CommandConsumerStats) Reset()         { *m = CommandConsumerStats{} }
+func (m *CommandConsumerStats) String() string { return proto.CompactTextString(m) }
+func (*CommandConsumerStats) ProtoMessage()    {}
+func (*CommandConsumerStats) Descriptor() ([]byte, []int) {
+	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{33}
+}
+func (m *CommandConsumerStats) XXX_Unmarshal(b []byte) error {
+	return xxx_messageInfo_CommandConsumerStats.Unmarshal(m, b)
+}
+func (m *CommandConsumerStats) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+	return xxx_messageInfo_CommandConsumerStats.Marshal(b, m, deterministic)
+}
+func (dst *CommandConsumerStats) XXX_Merge(src proto.Message) {
+	xxx_messageInfo_CommandConsumerStats.Merge(dst, src)
+}
+func (m *CommandConsumerStats) XXX_Size() int {
+	return xxx_messageInfo_CommandConsumerStats.Size(m)
+}
+func (m *CommandConsumerStats) XXX_DiscardUnknown() {
+	xxx_messageInfo_CommandConsumerStats.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_CommandConsumerStats proto.InternalMessageInfo
+
+func (m *CommandConsumerStats) GetRequestId() uint64 {
+	if m != nil && m.RequestId != nil {
+		return *m.RequestId
+	}
+	return 0
+}
+
+func (m *CommandConsumerStats) GetConsumerId() uint64 {
+	if m != nil && m.ConsumerId != nil {
+		return *m.ConsumerId
+	}
+	return 0
+}
+
+type CommandConsumerStatsResponse struct {
+	RequestId    *uint64      `protobuf:"varint,1,req,name=request_id,json=requestId" json:"request_id,omitempty"`
+	ErrorCode    *ServerError `protobuf:"varint,2,opt,name=error_code,json=errorCode,enum=pulsar.proto.ServerError" json:"error_code,omitempty"`
+	ErrorMessage *string      `protobuf:"bytes,3,opt,name=error_message,json=errorMessage" json:"error_message,omitempty"`
+	// / Total rate of messages delivered to the consumer. msg/s
+	MsgRateOut *float64 `protobuf:"fixed64,4,opt,name=msgRateOut" json:"msgRateOut,omitempty"`
+	// / Total throughput delivered to the consumer. bytes/s
+	MsgThroughputOut *float64 `protobuf:"fixed64,5,opt,name=msgThroughputOut" json:"msgThroughputOut,omitempty"`
+	// / Total rate of messages redelivered by this consumer. msg/s
+	MsgRateRedeliver *float64 `protobuf:"fixed64,6,opt,name=msgRateRedeliver" json:"msgRateRedeliver,omitempty"`
+	// / Name of the consumer
+	ConsumerName *string `protobuf:"bytes,7,opt,name=consumerName" json:"consumerName,omitempty"`
+	// / Number of available message permits for the consumer
+	AvailablePermits *uint64 `protobuf:"varint,8,opt,name=availablePermits" json:"availablePermits,omitempty"`
+	// / Number of unacknowledged messages for the consumer
+	UnackedMessages *uint64 `protobuf:"varint,9,opt,name=unackedMessages" json:"unackedMessages,omitempty"`
+	// / Flag to verify if consumer is blocked due to reaching threshold of unacked messages
+	BlockedConsumerOnUnackedMsgs *bool `protobuf:"varint,10,opt,name=blockedConsumerOnUnackedMsgs" json:"blockedConsumerOnUnackedMsgs,omitempty"`
+	// / Address of this consumer
+	Address *string `protobuf:"bytes,11,opt,name=address" json:"address,omitempty"`
+	// / Timestamp of connection
+	ConnectedSince *string `protobuf:"bytes,12,opt,name=connectedSince" json:"connectedSince,omitempty"`
+	// / Whether this subscription is Exclusive or Shared or Failover
+	Type *string `protobuf:"bytes,13,opt,name=type" json:"type,omitempty"`
+	// / Total rate of messages expired on this subscription. msg/s
+	MsgRateExpired *float64 `protobuf:"fixed64,14,opt,name=msgRateExpired" json:"msgRateExpired,omitempty"`
+	// / Number of messages in the subscription backlog
+	MsgBacklog           *uint64  `protobuf:"varint,15,opt,name=msgBacklog" json:"msgBacklog,omitempty"`
+	XXX_NoUnkeyedLiteral struct{} `json:"-"`
+	XXX_unrecognized     []byte   `json:"-"`
+	XXX_sizecache        int32    `json:"-"`
+}
+
+func (m *CommandConsumerStatsResponse) Reset()         { *m = CommandConsumerStatsResponse{} }
+func (m *CommandConsumerStatsResponse) String() string { return proto.CompactTextString(m) }
+func (*CommandConsumerStatsResponse) ProtoMessage()    {}
+func (*CommandConsumerStatsResponse) Descriptor() ([]byte, []int) {
+	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{34}
+}
+func (m *CommandConsumerStatsResponse) XXX_Unmarshal(b []byte) error {
+	return xxx_messageInfo_CommandConsumerStatsResponse.Unmarshal(m, b)
+}
+func (m *CommandConsumerStatsResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+	return xxx_messageInfo_CommandConsumerStatsResponse.Marshal(b, m, deterministic)
+}
+func (dst *CommandConsumerStatsResponse) XXX_Merge(src proto.Message) {
+	xxx_messageInfo_CommandConsumerStatsResponse.Merge(dst, src)
+}
+func (m *CommandConsumerStatsResponse) XXX_Size() int {
+	return xxx_messageInfo_CommandConsumerStatsResponse.Size(m)
+}
+func (m *CommandConsumerStatsResponse) XXX_DiscardUnknown() {
+	xxx_messageInfo_CommandConsumerStatsResponse.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_CommandConsumerStatsResponse proto.InternalMessageInfo
+
+func (m *CommandConsumerStatsResponse) GetRequestId() uint64 {
+	if m != nil && m.RequestId != nil {
+		return *m.RequestId
+	}
+	return 0
+}
+
+func (m *CommandConsumerStatsResponse) GetErrorCode() ServerError {
+	if m != nil && m.ErrorCode != nil {
+		return *m.ErrorCode
+	}
+	return ServerError_UnknownError
+}
+
+func (m *CommandConsumerStatsResponse) GetErrorMessage() string {
+	if m != nil && m.ErrorMessage != nil {
+		return *m.ErrorMessage
+	}
+	return ""
+}
+
+func (m *CommandConsumerStatsResponse) GetMsgRateOut() float64 {
+	if m != nil && m.MsgRateOut != nil {
+		return *m.MsgRateOut
+	}
+	return 0
+}
+
+func (m *CommandConsumerStatsResponse) GetMsgThroughputOut() float64 {
+	if m != nil && m.MsgThroughputOut != nil {
+		return *m.MsgThroughputOut
+	}
+	return 0
+}
+
+func (m *CommandConsumerStatsResponse) GetMsgRateRedeliver() float64 {
+	if m != nil && m.MsgRateRedeliver != nil {
+		return *m.MsgRateRedeliver
+	}
+	return 0
+}
+
+func (m *CommandConsumerStatsResponse) GetConsumerName() string {
+	if m != nil && m.ConsumerName != nil {
+		return *m.ConsumerName
+	}
+	return ""
+}
+
+func (m *CommandConsumerStatsResponse) GetAvailablePermits() uint64 {
+	if m != nil && m.AvailablePermits != nil {
+		return *m.AvailablePermits
+	}
+	return 0
+}
+
+func (m *CommandConsumerStatsResponse) GetUnackedMessages() uint64 {
+	if m != nil && m.UnackedMessages != nil {
+		return *m.UnackedMessages
+	}
+	return 0
+}
+
+func (m *CommandConsumerStatsResponse) GetBlockedConsumerOnUnackedMsgs() bool {
+	if m != nil && m.BlockedConsumerOnUnackedMsgs != nil {
+		return *m.BlockedConsumerOnUnackedMsgs
+	}
+	return false
+}
+
+func (m *CommandConsumerStatsResponse) GetAddress() string {
+	if m != nil && m.Address != nil {
+		return *m.Address
+	}
+	return ""
+}
+
+func (m *CommandConsumerStatsResponse) GetConnectedSince() string {
+	if m != nil && m.ConnectedSince != nil {
+		return *m.ConnectedSince
+	}
+	return ""
+}
+
+func (m *CommandConsumerStatsResponse) GetType() string {
+	if m != nil && m.Type != nil {
+		return *m.Type
+	}
+	return ""
+}
+
+func (m *CommandConsumerStatsResponse) GetMsgRateExpired() float64 {
+	if m != nil && m.MsgRateExpired != nil {
+		return *m.MsgRateExpired
+	}
+	return 0
+}
+
+func (m *CommandConsumerStatsResponse) GetMsgBacklog() uint64 {
+	if m != nil && m.MsgBacklog != nil {
+		return *m.MsgBacklog
+	}
+	return 0
+}
+
+type CommandGetLastMessageId struct {
+	ConsumerId           *uint64  `protobuf:"varint,1,req,name=consumer_id,json=consumerId" json:"consumer_id,omitempty"`
+	RequestId            *uint64  `protobuf:"varint,2,req,name=request_id,json=requestId" json:"request_id,omitempty"`
+	XXX_NoUnkeyedLiteral struct{} `json:"-"`
+	XXX_unrecognized     []byte   `json:"-"`
+	XXX_sizecache        int32    `json:"-"`
+}
+
+func (m *CommandGetLastMessageId) Reset()         { *m = CommandGetLastMessageId{} }
+func (m *CommandGetLastMessageId) String() string { return proto.CompactTextString(m) }
+func (*CommandGetLastMessageId) ProtoMessage()    {}
+func (*CommandGetLastMessageId) Descriptor() ([]byte, []int) {
+	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{35}
+}
+func (m *CommandGetLastMessageId) XXX_Unmarshal(b []byte) error {
+	return xxx_messageInfo_CommandGetLastMessageId.Unmarshal(m, b)
+}
+func (m *CommandGetLastMessageId) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+	return xxx_messageInfo_CommandGetLastMessageId.Marshal(b, m, deterministic)
+}
+func (dst *CommandGetLastMessageId) XXX_Merge(src proto.Message) {
+	xxx_messageInfo_CommandGetLastMessageId.Merge(dst, src)
+}
+func (m *CommandGetLastMessageId) XXX_Size() int {
+	return xxx_messageInfo_CommandGetLastMessageId.Size(m)
+}
+func (m *CommandGetLastMessageId) XXX_DiscardUnknown() {
+	xxx_messageInfo_CommandGetLastMessageId.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_CommandGetLastMessageId proto.InternalMessageInfo
+
+func (m *CommandGetLastMessageId) GetConsumerId() uint64 {
+	if m != nil && m.ConsumerId != nil {
+		return *m.ConsumerId
+	}
+	return 0
+}
+
+func (m *CommandGetLastMessageId) GetRequestId() uint64 {
+	if m != nil && m.RequestId != nil {
+		return *m.RequestId
+	}
+	return 0
+}
+
+type CommandGetLastMessageIdResponse struct {
+	LastMessageId        *MessageIdData `protobuf:"bytes,1,req,name=last_message_id,json=lastMessageId" json:"last_message_id,omitempty"`
+	RequestId            *uint64        `protobuf:"varint,2,req,name=request_id,json=requestId" json:"request_id,omitempty"`
+	XXX_NoUnkeyedLiteral struct{}       `json:"-"`
+	XXX_unrecognized     []byte         `json:"-"`
+	XXX_sizecache        int32          `json:"-"`
+}
+
+func (m *CommandGetLastMessageIdResponse) Reset()         { *m = CommandGetLastMessageIdResponse{} }
+func (m *CommandGetLastMessageIdResponse) String() string { return proto.CompactTextString(m) }
+func (*CommandGetLastMessageIdResponse) ProtoMessage()    {}
+func (*CommandGetLastMessageIdResponse) Descriptor() ([]byte, []int) {
+	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{36}
+}
+func (m *CommandGetLastMessageIdResponse) XXX_Unmarshal(b []byte) error {
+	return xxx_messageInfo_CommandGetLastMessageIdResponse.Unmarshal(m, b)
+}
+func (m *CommandGetLastMessageIdResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+	return xxx_messageInfo_CommandGetLastMessageIdResponse.Marshal(b, m, deterministic)
+}
+func (dst *CommandGetLastMessageIdResponse) XXX_Merge(src proto.Message) {
+	xxx_messageInfo_CommandGetLastMessageIdResponse.Merge(dst, src)
+}
+func (m *CommandGetLastMessageIdResponse) XXX_Size() int {
+	return xxx_messageInfo_CommandGetLastMessageIdResponse.Size(m)
+}
+func (m *CommandGetLastMessageIdResponse) XXX_DiscardUnknown() {
+	xxx_messageInfo_CommandGetLastMessageIdResponse.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_CommandGetLastMessageIdResponse proto.InternalMessageInfo
+
+func (m *CommandGetLastMessageIdResponse) GetLastMessageId() *MessageIdData {
+	if m != nil {
+		return m.LastMessageId
+	}
+	return nil
+}
+
+func (m *CommandGetLastMessageIdResponse) GetRequestId() uint64 {
+	if m != nil && m.RequestId != nil {
+		return *m.RequestId
+	}
+	return 0
+}
+
+type CommandGetTopicsOfNamespace struct {
+	RequestId            *uint64                           `protobuf:"varint,1,req,name=request_id,json=requestId" json:"request_id,omitempty"`
+	Namespace            *string                           `protobuf:"bytes,2,req,name=namespace" json:"namespace,omitempty"`
+	Mode                 *CommandGetTopicsOfNamespace_Mode `protobuf:"varint,3,opt,name=mode,enum=pulsar.proto.CommandGetTopicsOfNamespace_Mode,def=0" json:"mode,omitempty"`
+	XXX_NoUnkeyedLiteral struct{}                          `json:"-"`
+	XXX_unrecognized     []byte                            `json:"-"`
+	XXX_sizecache        int32                             `json:"-"`
+}
+
+func (m *CommandGetTopicsOfNamespace) Reset()         { *m = CommandGetTopicsOfNamespace{} }
+func (m *CommandGetTopicsOfNamespace) String() string { return proto.CompactTextString(m) }
+func (*CommandGetTopicsOfNamespace) ProtoMessage()    {}
+func (*CommandGetTopicsOfNamespace) Descriptor() ([]byte, []int) {
+	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{37}
+}
+func (m *CommandGetTopicsOfNamespace) XXX_Unmarshal(b []byte) error {
+	return xxx_messageInfo_CommandGetTopicsOfNamespace.Unmarshal(m, b)
+}
+func (m *CommandGetTopicsOfNamespace) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+	return xxx_messageInfo_CommandGetTopicsOfNamespace.Marshal(b, m, deterministic)
+}
+func (dst *CommandGetTopicsOfNamespace) XXX_Merge(src proto.Message) {
+	xxx_messageInfo_CommandGetTopicsOfNamespace.Merge(dst, src)
+}
+func (m *CommandGetTopicsOfNamespace) XXX_Size() int {
+	return xxx_messageInfo_CommandGetTopicsOfNamespace.Size(m)
+}
+func (m *CommandGetTopicsOfNamespace) XXX_DiscardUnknown() {
+	xxx_messageInfo_CommandGetTopicsOfNamespace.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_CommandGetTopicsOfNamespace proto.InternalMessageInfo
+
+const Default_CommandGetTopicsOfNamespace_Mode CommandGetTopicsOfNamespace_Mode = CommandGetTopicsOfNamespace_PERSISTENT
+
+func (m *CommandGetTopicsOfNamespace) GetRequestId() uint64 {
+	if m != nil && m.RequestId != nil {
+		return *m.RequestId
+	}
+	return 0
+}
+
+func (m *CommandGetTopicsOfNamespace) GetNamespace() string {
+	if m != nil && m.Namespace != nil {
+		return *m.Namespace
+	}
+	return ""
+}
+
+func (m *CommandGetTopicsOfNamespace) GetMode() CommandGetTopicsOfNamespace_Mode {
+	if m != nil && m.Mode != nil {
+		return *m.Mode
+	}
+	return Default_CommandGetTopicsOfNamespace_Mode
+}
+
+type CommandGetTopicsOfNamespaceResponse struct {
+	RequestId            *uint64  `protobuf:"varint,1,req,name=request_id,json=requestId" json:"request_id,omitempty"`
+	Topics               []string `protobuf:"bytes,2,rep,name=topics" json:"topics,omitempty"`
+	XXX_NoUnkeyedLiteral struct{} `json:"-"`
+	XXX_unrecognized     []byte   `json:"-"`
+	XXX_sizecache        int32    `json:"-"`
+}
+
+func (m *CommandGetTopicsOfNamespaceResponse) Reset()         { *m = CommandGetTopicsOfNamespaceResponse{} }
+func (m *CommandGetTopicsOfNamespaceResponse) String() string { return proto.CompactTextString(m) }
+func (*CommandGetTopicsOfNamespaceResponse) ProtoMessage()    {}
+func (*CommandGetTopicsOfNamespaceResponse) Descriptor() ([]byte, []int) {
+	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{38}
+}
+func (m *CommandGetTopicsOfNamespaceResponse) XXX_Unmarshal(b []byte) error {
+	return xxx_messageInfo_CommandGetTopicsOfNamespaceResponse.Unmarshal(m, b)
+}
+func (m *CommandGetTopicsOfNamespaceResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+	return xxx_messageInfo_CommandGetTopicsOfNamespaceResponse.Marshal(b, m, deterministic)
+}
+func (dst *CommandGetTopicsOfNamespaceResponse) XXX_Merge(src proto.Message) {
+	xxx_messageInfo_CommandGetTopicsOfNamespaceResponse.Merge(dst, src)
+}
+func (m *CommandGetTopicsOfNamespaceResponse) XXX_Size() int {
+	return xxx_messageInfo_CommandGetTopicsOfNamespaceResponse.Size(m)
+}
+func (m *CommandGetTopicsOfNamespaceResponse) XXX_DiscardUnknown() {
+	xxx_messageInfo_CommandGetTopicsOfNamespaceResponse.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_CommandGetTopicsOfNamespaceResponse proto.InternalMessageInfo
+
+func (m *CommandGetTopicsOfNamespaceResponse) GetRequestId() uint64 {
+	if m != nil && m.RequestId != nil {
+		return *m.RequestId
+	}
+	return 0
+}
+
+func (m *CommandGetTopicsOfNamespaceResponse) GetTopics() []string {
+	if m != nil {
+		return m.Topics
+	}
+	return nil
+}
+
+type CommandGetSchema struct {
+	RequestId            *uint64  `protobuf:"varint,1,req,name=request_id,json=requestId" json:"request_id,omitempty"`
+	Topic                *string  `protobuf:"bytes,2,req,name=topic" json:"topic,omitempty"`
+	SchemaVersion        []byte   `protobuf:"bytes,3,opt,name=schema_version,json=schemaVersion" json:"schema_version,omitempty"`
+	XXX_NoUnkeyedLiteral struct{} `json:"-"`
+	XXX_unrecognized     []byte   `json:"-"`
+	XXX_sizecache        int32    `json:"-"`
+}
+
+func (m *CommandGetSchema) Reset()         { *m = CommandGetSchema{} }
+func (m *CommandGetSchema) String() string { return proto.CompactTextString(m) }
+func (*CommandGetSchema) ProtoMessage()    {}
+func (*CommandGetSchema) Descriptor() ([]byte, []int) {
+	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{39}
+}
+func (m *CommandGetSchema) XXX_Unmarshal(b []byte) error {
+	return xxx_messageInfo_CommandGetSchema.Unmarshal(m, b)
+}
+func (m *CommandGetSchema) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+	return xxx_messageInfo_CommandGetSchema.Marshal(b, m, deterministic)
+}
+func (dst *CommandGetSchema) XXX_Merge(src proto.Message) {
+	xxx_messageInfo_CommandGetSchema.Merge(dst, src)
+}
+func (m *CommandGetSchema) XXX_Size() int {
+	return xxx_messageInfo_CommandGetSchema.Size(m)
+}
+func (m *CommandGetSchema) XXX_DiscardUnknown() {
+	xxx_messageInfo_CommandGetSchema.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_CommandGetSchema proto.InternalMessageInfo
+
+func (m *CommandGetSchema) GetRequestId() uint64 {
+	if m != nil && m.RequestId != nil {
+		return *m.RequestId
+	}
+	return 0
+}
+
+func (m *CommandGetSchema) GetTopic() string {
+	if m != nil && m.Topic != nil {
+		return *m.Topic
+	}
+	return ""
+}
+
+func (m *CommandGetSchema) GetSchemaVersion() []byte {
+	if m != nil {
+		return m.SchemaVersion
+	}
+	return nil
+}
+
+type CommandGetSchemaResponse struct {
+	RequestId            *uint64      `protobuf:"varint,1,req,name=request_id,json=requestId" json:"request_id,omitempty"`
+	ErrorCode            *ServerError `protobuf:"varint,2,opt,name=error_code,json=errorCode,enum=pulsar.proto.ServerError" json:"error_code,omitempty"`
+	ErrorMessage         *string      `protobuf:"bytes,3,opt,name=error_message,json=errorMessage" json:"error_message,omitempty"`
+	Schema               *Schema      `protobuf:"bytes,4,opt,name=schema" json:"schema,omitempty"`
+	SchemaVersion        []byte       `protobuf:"bytes,5,opt,name=schema_version,json=schemaVersion" json:"schema_version,omitempty"`
+	XXX_NoUnkeyedLiteral struct{}     `json:"-"`
+	XXX_unrecognized     []byte       `json:"-"`
+	XXX_sizecache        int32        `json:"-"`
+}
+
+func (m *CommandGetSchemaResponse) Reset()         { *m = CommandGetSchemaResponse{} }
+func (m *CommandGetSchemaResponse) String() string { return proto.CompactTextString(m) }
+func (*CommandGetSchemaResponse) ProtoMessage()    {}
+func (*CommandGetSchemaResponse) Descriptor() ([]byte, []int) {
+	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{40}
+}
+func (m *CommandGetSchemaResponse) XXX_Unmarshal(b []byte) error {
+	return xxx_messageInfo_CommandGetSchemaResponse.Unmarshal(m, b)
+}
+func (m *CommandGetSchemaResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+	return xxx_messageInfo_CommandGetSchemaResponse.Marshal(b, m, deterministic)
+}
+func (dst *CommandGetSchemaResponse) XXX_Merge(src proto.Message) {
+	xxx_messageInfo_CommandGetSchemaResponse.Merge(dst, src)
+}
+func (m *CommandGetSchemaResponse) XXX_Size() int {
+	return xxx_messageInfo_CommandGetSchemaResponse.Size(m)
+}
+func (m *CommandGetSchemaResponse) XXX_DiscardUnknown() {
+	xxx_messageInfo_CommandGetSchemaResponse.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_CommandGetSchemaResponse proto.InternalMessageInfo
+
+func (m *CommandGetSchemaResponse) GetRequestId() uint64 {
+	if m != nil && m.RequestId != nil {
+		return *m.RequestId
+	}
+	return 0
+}
+
+func (m *CommandGetSchemaResponse) GetErrorCode() ServerError {
+	if m != nil && m.ErrorCode != nil {
+		return *m.ErrorCode
+	}
+	return ServerError_UnknownError
+}
+
+func (m *CommandGetSchemaResponse) GetErrorMessage() string {
+	if m != nil && m.ErrorMessage != nil {
+		return *m.ErrorMessage
+	}
+	return ""
+}
+
+func (m *CommandGetSchemaResponse) GetSchema() *Schema {
+	if m != nil {
+		return m.Schema
+	}
+	return nil
+}
+
+func (m *CommandGetSchemaResponse) GetSchemaVersion() []byte {
+	if m != nil {
+		return m.SchemaVersion
+	}
+	return nil
+}
+
+type BaseCommand struct {
+	Type                            *BaseCommand_Type                        `protobuf:"varint,1,req,name=type,enum=pulsar.proto.BaseCommand_Type" json:"type,omitempty"`
+	Connect                         *CommandConnect                          `protobuf:"bytes,2,opt,name=connect" json:"connect,omitempty"`
+	Connected                       *CommandConnected                        `protobuf:"bytes,3,opt,name=connected" json:"connected,omitempty"`
+	Subscribe                       *CommandSubscribe                        `protobuf:"bytes,4,opt,name=subscribe" json:"subscribe,omitempty"`
+	Producer                        *CommandProducer                         `protobuf:"bytes,5,opt,name=producer" json:"producer,omitempty"`
+	Send                            *CommandSend                             `protobuf:"bytes,6,opt,name=send" json:"send,omitempty"`
+	SendReceipt                     *CommandSendReceipt                      `protobuf:"bytes,7,opt,name=send_receipt,json=sendReceipt" json:"send_receipt,omitempty"`
+	SendError                       *CommandSendError                        `protobuf:"bytes,8,opt,name=send_error,json=sendError" json:"send_error,omitempty"`
+	Message                         *CommandMessage                          `protobuf:"bytes,9,opt,name=message" json:"message,omitempty"`
+	Ack                             *CommandAck                              `protobuf:"bytes,10,opt,name=ack" json:"ack,omitempty"`
+	Flow                            *CommandFlow                             `protobuf:"bytes,11,opt,name=flow" json:"flow,omitempty"`
+	Unsubscribe                     *CommandUnsubscribe                      `protobuf:"bytes,12,opt,name=unsubscribe" json:"unsubscribe,omitempty"`
+	Success                         *CommandSuccess                          `protobuf:"bytes,13,opt,name=success" json:"success,omitempty"`
+	Error                           *CommandError                            `protobuf:"bytes,14,opt,name=error" json:"error,omitempty"`
+	CloseProducer                   *CommandCloseProducer                    `protobuf:"bytes,15,opt,name=close_producer,json=closeProducer" json:"close_producer,omitempty"`
+	CloseConsumer                   *CommandCloseConsumer                    `protobuf:"bytes,16,opt,name=close_consumer,json=closeConsumer" json:"close_consumer,omitempty"`
+	ProducerSuccess                 *CommandProducerSuccess                  `protobuf:"bytes,17,opt,name=producer_success,json=producerSuccess" json:"producer_success,omitempty"`
+	Ping                            *CommandPing                             `protobuf:"bytes,18,opt,name=ping" json:"ping,omitempty"`
+	Pong                            *CommandPong                             `protobuf:"bytes,19,opt,name=pong" json:"pong,omitempty"`
+	RedeliverUnacknowledgedMessages *CommandRedeliverUnacknowledgedMessages  `protobuf:"bytes,20,opt,name=redeliverUnacknowledgedMessages" json:"redeliverUnacknowledgedMessages,omitempty"`
+	PartitionMetadata               *CommandPartitionedTopicMetadata         `protobuf:"bytes,21,opt,name=partitionMetadata" json:"partitionMetadata,omitempty"`
+	PartitionMetadataResponse       *CommandPartitionedTopicMetadataResponse `protobuf:"bytes,22,opt,name=partitionMetadataResponse" json:"partitionMetadataResponse,omitempty"`
+	LookupTopic                     *CommandLookupTopic                      `protobuf:"bytes,23,opt,name=lookupTopic" json:"lookupTopic,omitempty"`
+	LookupTopicResponse             *CommandLookupTopicResponse              `protobuf:"bytes,24,opt,name=lookupTopicResponse" json:"lookupTopicResponse,omitempty"`
+	ConsumerStats                   *CommandConsumerStats                    `protobuf:"bytes,25,opt,name=consumerStats" json:"consumerStats,omitempty"`
+	ConsumerStatsResponse           *CommandConsumerStatsResponse            `protobuf:"bytes,26,opt,name=consumerStatsResponse" json:"consumerStatsResponse,omitempty"`
+	ReachedEndOfTopic               *CommandReachedEndOfTopic                `protobuf:"bytes,27,opt,name=reachedEndOfTopic" json:"reachedEndOfTopic,omitempty"`
+	Seek                            *CommandSeek                             `protobuf:"bytes,28,opt,name=seek" json:"seek,omitempty"`
+	GetLastMessageId                *CommandGetLastMessageId                 `protobuf:"bytes,29,opt,name=getLastMessageId" json:"getLastMessageId,omitempty"`
+	GetLastMessageIdResponse        *CommandGetLastMessageIdResponse         `protobuf:"bytes,30,opt,name=getLastMessageIdResponse" json:"getLastMessageIdResponse,omitempty"`
+	ActiveConsumerChange            *CommandActiveConsumerChange             `protobuf:"bytes,31,opt,name=active_consumer_change,json=activeConsumerChange" json:"active_consumer_change,omitempty"`
+	GetTopicsOfNamespace            *CommandGetTopicsOfNamespace             `protobuf:"bytes,32,opt,name=getTopicsOfNamespace" json:"getTopicsOfNamespace,omitempty"`
+	GetTopicsOfNamespaceResponse    *CommandGetTopicsOfNamespaceResponse     `protobuf:"bytes,33,opt,name=getTopicsOfNamespaceResponse" json:"getTopicsOfNamespaceResponse,omitempty"`
+	GetSchema                       *CommandGetSchema                        `protobuf:"bytes,34,opt,name=getSchema" json:"getSchema,omitempty"`
+	GetSchemaResponse               *CommandGetSchemaResponse                `protobuf:"bytes,35,opt,name=getSchemaResponse" json:"getSchemaResponse,omitempty"`
+	XXX_NoUnkeyedLiteral            struct{}                                 `json:"-"`
+	XXX_unrecognized                []byte                                   `json:"-"`
+	XXX_sizecache                   int32                                    `json:"-"`
+}
+
+func (m *BaseCommand) Reset()         { *m = BaseCommand{} }
+func (m *BaseCommand) String() string { return proto.CompactTextString(m) }
+func (*BaseCommand) ProtoMessage()    {}
+func (*BaseCommand) Descriptor() ([]byte, []int) {
+	return fileDescriptor_PulsarApi_9ff3bd5e091a4809, []int{41}
+}
+func (m *BaseCommand) XXX_Unmarshal(b []byte) error {
+	return xxx_messageInfo_BaseCommand.Unmarshal(m, b)
+}
+func (m *BaseCommand) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+	return xxx_messageInfo_BaseCommand.Marshal(b, m, deterministic)
+}
+func (dst *BaseCommand) XXX_Merge(src proto.Message) {
+	xxx_messageInfo_BaseCommand.Merge(dst, src)
+}
+func (m *BaseCommand) XXX_Size() int {
+	return xxx_messageInfo_BaseCommand.Size(m)
+}
+func (m *BaseCommand) XXX_DiscardUnknown() {
+	xxx_messageInfo_BaseCommand.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_BaseCommand proto.InternalMessageInfo
+
+func (m *BaseCommand) GetType() BaseCommand_Type {
+	if m != nil && m.Type != nil {
+		return *m.Type
+	}
+	return BaseCommand_CONNECT
+}
+
+func (m *BaseCommand) GetConnect() *CommandConnect {
+	if m != nil {
+		return m.Connect
+	}
+	return nil
+}
+
+func (m *BaseCommand) GetConnected() *CommandConnected {
+	if m != nil {
+		return m.Connected
+	}
+	return nil
+}
+
+func (m *BaseCommand) GetSubscribe() *CommandSubscribe {
+	if m != nil {
+		return m.Subscribe
+	}
+	return nil
+}
+
+func (m *BaseCommand) GetProducer() *CommandProducer {
+	if m != nil {
+		return m.Producer
+	}
+	return nil
+}
+
+func (m *BaseCommand) GetSend() *CommandSend {
+	if m != nil {
+		return m.Send
+	}
+	return nil
+}
+
+func (m *BaseCommand) GetSendReceipt() *CommandSendReceipt {
+	if m != nil {
+		return m.SendReceipt
+	}
+	return nil
+}
+
+func (m *BaseCommand) GetSendError() *CommandSendError {
+	if m != nil {
+		return m.SendError
+	}
+	return nil
+}
+
+func (m *BaseCommand) GetMessage() *CommandMessage {
+	if m != nil {
+		return m.Message
+	}
+	return nil
+}
+
+func (m *BaseCommand) GetAck() *CommandAck {
+	if m != nil {
+		return m.Ack
+	}
+	return nil
+}
+
+func (m *BaseCommand) GetFlow() *CommandFlow {
+	if m != nil {
+		return m.Flow
+	}
+	return nil
+}
+
+func (m *BaseCommand) GetUnsubscribe() *CommandUnsubscribe {
+	if m != nil {
+		return m.Unsubscribe
+	}
+	return nil
+}
+
+func (m *BaseCommand) GetSuccess() *CommandSuccess {
+	if m != nil {
+		return m.Success
+	}
+	return nil
+}
+
+func (m *BaseCommand) GetError() *CommandError {
+	if m != nil {
+		return m.Error
+	}
+	return nil
+}
+
+func (m *BaseCommand) GetCloseProducer() *CommandCloseProducer {
+	if m != nil {
+		return m.CloseProducer
+	}
+	return nil
+}
+
+func (m *BaseCommand) GetCloseConsumer() *CommandCloseConsumer {
+	if m != nil {
+		return m.CloseConsumer
+	}
+	return nil
+}
+
+func (m *BaseCommand) GetProducerSuccess() *CommandProducerSuccess {
+	if m != nil {
+		return m.ProducerSuccess
+	}
+	return nil
+}
+
+func (m *BaseCommand) GetPing() *CommandPing {
+	if m != nil {
+		return m.Ping
+	}
+	return nil
+}
+
+func (m *BaseCommand) GetPong() *CommandPong {
+	if m != nil {
+		return m.Pong
+	}
+	return nil
+}
+
+func (m *BaseCommand) GetRedeliverUnacknowledgedMessages() *CommandRedeliverUnacknowledgedMessages {
+	if m != nil {
+		return m.RedeliverUnacknowledgedMessages
+	}
+	return nil
+}
+
+func (m *BaseCommand) GetPartitionMetadata() *CommandPartitionedTopicMetadata {
+	if m != nil {
+		return m.PartitionMetadata
+	}
+	return nil
+}
+
+func (m *BaseCommand) GetPartitionMetadataResponse() *CommandPartitionedTopicMetadataResponse {
+	if m != nil {
+		return m.PartitionMetadataResponse
+	}
+	return nil
+}
+
+func (m *BaseCommand) GetLookupTopic() *CommandLookupTopic {
+	if m != nil {
+		return m.LookupTopic
+	}
+	return nil
+}
+
+func (m *BaseCommand) GetLookupTopicResponse() *CommandLookupTopicResponse {
+	if m != nil {
+		return m.LookupTopicResponse
+	}
+	return nil
+}
+
+func (m *BaseCommand) GetConsumerStats() *CommandConsumerStats {
+	if m != nil {
+		return m.ConsumerStats
+	}
+	return nil
+}
+
+func (m *BaseCommand) GetConsumerStatsResponse() *CommandConsumerStatsResponse {
+	if m != nil {
+		return m.ConsumerStatsResponse
+	}
+	return nil
+}
+
+func (m *BaseCommand) GetReachedEndOfTopic() *CommandReachedEndOfTopic {
+	if m != nil {
+		return m.ReachedEndOfTopic
+	}
+	return nil
+}
+
+func (m *BaseCommand) GetSeek() *CommandSeek {
+	if m != nil {
+		return m.Seek
+	}
+	return nil
+}
+
+func (m *BaseCommand) GetGetLastMessageId() *CommandGetLastMessageId {
+	if m != nil {
+		return m.GetLastMessageId
+	}
+	return nil
+}
+
+func (m *BaseCommand) GetGetLastMessageIdResponse() *CommandGetLastMessageIdResponse {
+	if m != nil {
+		return m.GetLastMessageIdResponse
+	}
+	return nil
+}
+
+func (m *BaseCommand) GetActiveConsumerChange() *CommandActiveConsumerChange {
+	if m != nil {
+		return m.ActiveConsumerChange
+	}
+	return nil
+}
+
+func (m *BaseCommand) GetGetTopicsOfNamespace() *CommandGetTopicsOfNamespace {
+	if m != nil {
+		return m.GetTopicsOfNamespace
+	}
+	return nil
+}
+
+func (m *BaseCommand) GetGetTopicsOfNamespaceResponse() *CommandGetTopicsOfNamespaceResponse {
+	if m != nil {
+		return m.GetTopicsOfNamespaceResponse
+	}
+	return nil
+}
+
+func (m *BaseCommand) GetGetSchema() *CommandGetSchema {
+	if m != nil {
+		return m.GetSchema
+	}
+	return nil
+}
+
+func (m *BaseCommand) GetGetSchemaResponse() *CommandGetSchemaResponse {
+	if m != nil {
+		return m.GetSchemaResponse
+	}
+	return nil
+}
+
+func init() {
+	proto.RegisterType((*Schema)(nil), "pulsar.proto.Schema")
+	proto.RegisterType((*MessageIdData)(nil), "pulsar.proto.MessageIdData")
+	proto.RegisterType((*KeyValue)(nil), "pulsar.proto.KeyValue")
+	proto.RegisterType((*KeyLongValue)(nil), "pulsar.proto.KeyLongValue")
+	proto.RegisterType((*EncryptionKeys)(nil), "pulsar.proto.EncryptionKeys")
+	proto.RegisterType((*MessageMetadata)(nil), "pulsar.proto.MessageMetadata")
+	proto.RegisterType((*SingleMessageMetadata)(nil), "pulsar.proto.SingleMessageMetadata")
+	proto.RegisterType((*CommandConnect)(nil), "pulsar.proto.CommandConnect")
+	proto.RegisterType((*CommandConnected)(nil), "pulsar.proto.CommandConnected")
+	proto.RegisterType((*CommandSubscribe)(nil), "pulsar.proto.CommandSubscribe")
+	proto.RegisterType((*CommandPartitionedTopicMetadata)(nil), "pulsar.proto.CommandPartitionedTopicMetadata")
+	proto.RegisterType((*CommandPartitionedTopicMetadataResponse)(nil), "pulsar.proto.CommandPartitionedTopicMetadataResponse")
+	proto.RegisterType((*CommandLookupTopic)(nil), "pulsar.proto.CommandLookupTopic")
+	proto.RegisterType((*CommandLookupTopicResponse)(nil), "pulsar.proto.CommandLookupTopicResponse")
+	proto.RegisterType((*CommandProducer)(nil), "pulsar.proto.CommandProducer")
+	proto.RegisterType((*CommandSend)(nil), "pulsar.proto.CommandSend")
+	proto.RegisterType((*CommandSendReceipt)(nil), "pulsar.proto.CommandSendReceipt")
+	proto.RegisterType((*CommandSendError)(nil), "pulsar.proto.CommandSendError")
+	proto.RegisterType((*CommandMessage)(nil), "pulsar.proto.CommandMessage")
+	proto.RegisterType((*CommandAck)(nil), "pulsar.proto.CommandAck")
+	proto.RegisterType((*CommandActiveConsumerChange)(nil), "pulsar.proto.CommandActiveConsumerChange")
+	proto.RegisterType((*CommandFlow)(nil), "pulsar.proto.CommandFlow")
+	proto.RegisterType((*CommandUnsubscribe)(nil), "pulsar.proto.CommandUnsubscribe")
+	proto.RegisterType((*CommandSeek)(nil), "pulsar.proto.CommandSeek")
+	proto.RegisterType((*CommandReachedEndOfTopic)(nil), "pulsar.proto.CommandReachedEndOfTopic")
+	proto.RegisterType((*CommandCloseProducer)(nil), "pulsar.proto.CommandCloseProducer")
+	proto.RegisterType((*CommandCloseConsumer)(nil), "pulsar.proto.CommandCloseConsumer")
+	proto.RegisterType((*CommandRedeliverUnacknowledgedMessages)(nil), "pulsar.proto.CommandRedeliverUnacknowledgedMessages")
+	proto.RegisterType((*CommandSuccess)(nil), "pulsar.proto.CommandSuccess")
+	proto.RegisterType((*CommandProducerSuccess)(nil), "pulsar.proto.CommandProducerSuccess")
+	proto.RegisterType((*CommandError)(nil), "pulsar.proto.CommandError")
+	proto.RegisterType((*CommandPing)(nil), "pulsar.proto.CommandPing")
+	proto.RegisterType((*CommandPong)(nil), "pulsar.proto.CommandPong")
+	proto.RegisterType((*CommandConsumerStats)(nil), "pulsar.proto.CommandConsumerStats")
+	proto.RegisterType((*CommandConsumerStatsResponse)(nil), "pulsar.proto.CommandConsumerStatsResponse")
+	proto.RegisterType((*CommandGetLastMessageId)(nil), "pulsar.proto.CommandGetLastMessageId")
+	proto.RegisterType((*CommandGetLastMessageIdResponse)(nil), "pulsar.proto.CommandGetLastMessageIdResponse")
+	proto.RegisterType((*CommandGetTopicsOfNamespace)(nil), "pulsar.proto.CommandGetTopicsOfNamespace")
+	proto.RegisterType((*CommandGetTopicsOfNamespaceResponse)(nil), "pulsar.proto.CommandGetTopicsOfNamespaceResponse")
+	proto.RegisterType((*CommandGetSchema)(nil), "pulsar.proto.CommandGetSchema")
+	proto.RegisterType((*CommandGetSchemaResponse)(nil), "pulsar.proto.CommandGetSchemaResponse")
+	proto.RegisterType((*BaseCommand)(nil), "pulsar.proto.BaseCommand")
+	proto.RegisterEnum("pulsar.proto.CompressionType", CompressionType_name, CompressionType_value)
+	proto.RegisterEnum("pulsar.proto.ServerError", ServerError_name, ServerError_value)
+	proto.RegisterEnum("pulsar.proto.AuthMethod", AuthMethod_name, AuthMethod_value)
+	proto.RegisterEnum("pulsar.proto.ProtocolVersion", ProtocolVersion_name, ProtocolVersion_value)
+	proto.RegisterEnum("pulsar.proto.Schema_Type", Schema_Type_name, Schema_Type_value)
+	proto.RegisterEnum("pulsar.proto.CommandSubscribe_SubType", CommandSubscribe_SubType_name, CommandSubscribe_SubType_value)
+	proto.RegisterEnum("pulsar.proto.CommandSubscribe_InitialPosition", CommandSubscribe_InitialPosition_name, CommandSubscribe_InitialPosition_value)
+	proto.RegisterEnum("pulsar.proto.CommandPartitionedTopicMetadataResponse_LookupType", CommandPartitionedTopicMetadataResponse_LookupType_name, CommandPartitionedTopicMetadataResponse_LookupType_value)
+	proto.RegisterEnum("pulsar.proto.CommandLookupTopicResponse_LookupType", CommandLookupTopicResponse_LookupType_name, CommandLookupTopicResponse_LookupType_value)
+	proto.RegisterEnum("pulsar.proto.CommandAck_AckType", CommandAck_AckType_name, CommandAck_AckType_value)
+	proto.RegisterEnum("pulsar.proto.CommandAck_ValidationError", CommandAck_ValidationError_name, CommandAck_ValidationError_value)
+	proto.RegisterEnum("pulsar.proto.CommandGetTopicsOfNamespace_Mode", CommandGetTopicsOfNamespace_Mode_name, CommandGetTopicsOfNamespace_Mode_value)
+	proto.RegisterEnum("pulsar.proto.BaseCommand_Type", BaseCommand_Type_name, BaseCommand_Type_value)
+}
+
+func init() { proto.RegisterFile("PulsarApi.proto", fileDescriptor_PulsarApi_9ff3bd5e091a4809) }
+
+var fileDescriptor_PulsarApi_9ff3bd5e091a4809 = []byte{
+	// 4065 bytes of a gzipped FileDescriptorProto
+	0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xcc, 0x3b, 0x4d, 0x6f, 0x1b, 0x49,
+	0x76, 0x6e, 0x7e, 0x48, 0xe4, 0xe3, 0x57, 0xb9, 0x2c, 0x6b, 0xda, 0x1f, 0x63, 0xd3, 0xed, 0xb5,
+	0x47, 0xe3, 0x1d, 0x2b, 0xb6, 0xec, 0x75, 0x66, 0xbc, 0x9b, 0x60, 0x28, 0xaa, 0x6d, 0x33, 0x96,
+	0x48, 0x6d, 0x91, 0xf2, 0x62, 0x27, 0xbb, 0xe8, 0x6d, 0x75, 0x97, 0xa9, 0x86, 0x9a, 0xdd, 0x4c,
+	0x7f, 0x68, 0xac, 0x39, 0xe4, 0x36, 0x08, 0x02, 0x04, 0x08, 0x90, 0x1c, 0x73, 0xc8, 0x21, 0x08,
+	0x72, 0xce, 0x2d, 0x40, 0x7e, 0x43, 0x6e, 0xb9, 0xe4, 0x94, 0x4b, 0x72, 0x0c, 0x90, 0x43, 0x90,
+	0x73, 0x50, 0xd5, 0xd5, 0x5f, 0x24, 0x45, 0xca, 0x3b, 0x7b, 0xc8, 0x89, 0xdd, 0xaf, 0x5e, 0xbd,
+	0x7a, 0xf5, 0xde, 0xab, 0xf7, 0x55, 0x4d, 0x68, 0x1d, 0x86, 0xb6, 0xaf, 0x7b, 0x9d, 0xa9, 0xb5,
+	0x3d, 0xf5, 0xdc, 0xc0, 0xc5, 0xf5, 0x29, 0x07, 0x44, 0x6f, 0xca, 0x7f, 0x48, 0xb0, 0x36, 0x34,
+	0x4e, 0xe8, 0x44, 0xc7, 0x18, 0x4a, 0x8e, 0x3e, 0xa1, 0xb2, 0xd4, 0x2e, 0x6c, 0x55, 0x09, 0x7f,
+	0xc6, 0x77, 0xa1, 0xe6, 0xf3, 0x51, 0xcd, 0xd4, 0x03, 0x5d, 0x2e, 0xb6, 0x0b, 0x5b, 0x75, 0x02,
+	0x11, 0x68, 0x4f, 0x0f, 0x74, 0xfc, 0x18, 0x4a, 0xc1, 0xf9, 0x94, 0xca, 0xa5, 0x76, 0x61, 0xab,
+	0xb9, 0x73, 0x63, 0x3b, 0x4b, 0x7c, 0x3b, 0x22, 0xbc, 0x3d, 0x3a, 0x9f, 0x52, 0xc2, 0xd1, 0xf0,
+	0x0b, 0x80, 0xa9, 0xe7, 0x4e, 0xa9, 0x17, 0x58, 0xd4, 0x97, 0xcb, 0xed, 0xe2, 0x56, 0x6d, 0x67,
+	0x33, 0x3f, 0xe9, 0x2d, 0x3d, 0x7f, 0xa7, 0xdb, 0x21, 0x25, 0x19, 0x4c, 0xe5, 0x0f, 0xa1, 0xc4,
+	0xa8, 0xe0, 0x0a, 0x94, 0xfa, 0xae, 0x43, 0xd1, 0x15, 0x0c, 0xb0, 0x36, 0x0c, 0x3c, 0xcb, 0x19,
+	0x23, 0x89, 0x41, 0xff, 0xc8, 0x77, 0x1d, 0x54, 0xc0, 0x75, 0xa8, 0x1c, 0x32, 0x2a, 0xc7, 0xe1,
+	0x7b, 0x54, 0x64, 0xf0, 0xce, 0x99, 0xe7, 0xa2, 0x92, 0xf2, 0x17, 0x12, 0x34, 0x0e, 0xa8, 0xef,
+	0xeb, 0x63, 0xda, 0x33, 0x39, 0xe3, 0x37, 0xa1, 0x62, 0x53, 0x73, 0x4c, 0xbd, 0x9e, 0xc9, 0x77,
+	0x5c, 0x22, 0xc9, 0x3b, 0x96, 0x61, 0x9d, 0x3a, 0x81, 0x77, 0xde, 0x33, 0xe5, 0x02, 0x1f, 0x8a,
+	0x5f, 0x71, 0x1b, 0xaa, 0x53, 0xdd, 0x0b, 0xac, 0xc0, 0x72, 0x1d, 0xb9, 0xd8, 0x96, 0xb6, 0xca,
+	0x2f, 0x0b, 0x8f, 0x9f, 0x92, 0x14, 0x88, 0xef, 0x43, 0xed, 0x58, 0x0f, 0x8c, 0x13, 0xcd, 0x72,
+	0x4c, 0xfa, 0x41, 0x2e, 0x25, 0x38, 0xc0, 0xc1, 0x3d, 0x06, 0x55, 0x76, 0xa0, 0x12, 0x6f, 0x13,
+	0x23, 0x28, 0x9e, 0xd2, 0x73, 0x21, 0x75, 0xf6, 0x88, 0x37, 0xa0, 0x7c, 0xc6, 0x86, 0xf8, 0xe2,
+	0x55, 0x12, 0xbd, 0x28, 0x2f, 0xa0, 0xfe, 0x96, 0x9e, 0xef, 0xbb, 0xce, 0xf8, 0x52, 0xf3, 0x4a,
+	0xf1, 0x3c, 0x1b, 0x9a, 0xaa, 0x63, 0x78, 0xe7, 0x53, 0xc6, 0xde, 0x5b, 0x7a, 0xee, 0xaf, 0x9a,
+	0x59, 0x17, 0x33, 0xf1, 0x0e, 0x54, 0x26, 0x34, 0xd0, 0x85, 0xe6, 0x97, 0xa9, 0x2a, 0xc1, 0x53,
+	0xfe, 0xb7, 0x0c, 0x2d, 0x21, 0xe8, 0x03, 0x01, 0xc3, 0xf7, 0xa1, 0x31, 0xf5, 0x5c, 0x33, 0x34,
+	0xa8, 0xa7, 0x65, 0x2c, 0xac, 0x1e, 0x03, 0xfb, 0xb1, 0xa5, 0xd1, 0x3f, 0x09, 0xa9, 0x63, 0x50,
+	0xcd, 0x8a, 0xe5, 0x0e, 0x31, 0xa8, 0x67, 0xe2, 0x7b, 0x50, 0x9f, 0x86, 0xc7, 0xb6, 0xe5, 0x9f,
+	0x68, 0x81, 0x35, 0xa1, 0xdc, 0x16, 0x4b, 0xa4, 0x26, 0x60, 0x23, 0x6b, 0x32, 0x6b, 0x5d, 0xa5,
+	0xcb, 0x5a, 0x17, 0xfe, 0x0c, 0x5a, 0x1e, 0x9d, 0xda, 0x96, 0xa1, 0x07, 0xd4, 0xd4, 0xde, 0x7b,
+	0xee, 0x44, 0x2e, 0xb7, 0xa5, 0xad, 0x2a, 0x69, 0xa6, 0xe0, 0x57, 0x9e, 0x3b, 0xe1, 0x3b, 0x89,
+	0x35, 0xad, 0x31, 0x19, 0xae, 0x71, 0xb4, 0x7a, 0x02, 0x7c, 0x4b, 0xcf, 0x19, 0xa3, 0xc9, 0x34,
+	0x2d, 0x70, 0xe5, 0xf5, 0x76, 0x71, 0xab, 0x4a, 0x6a, 0x09, 0x6c, 0xe4, 0x62, 0x15, 0x6a, 0x86,
+	0x3b, 0x99, 0x7a, 0xd4, 0xf7, 0x99, 0x21, 0x55, 0xda, 0xd2, 0x56, 0x73, 0xe7, 0xd3, 0x3c, 0xa7,
+	0xdd, 0x14, 0x81, 0x99, 0xfe, 0xcb, 0x52, 0x7f, 0xd0, 0x57, 0x49, 0x76, 0x1e, 0xde, 0x86, 0xab,
+	0xa1, 0x13, 0x03, 0xa8, 0xa9, 0xf9, 0xd6, 0x77, 0x54, 0xae, 0xb6, 0xa5, 0xad, 0xc6, 0x4b, 0xe9,
+	0x09, 0x41, 0xd9, 0xb1, 0xa1, 0xf5, 0x1d, 0xc5, 0xcf, 0xe1, 0xba, 0x13, 0x4e, 0xb4, 0x49, 0xa4,
+	0x1f, 0x5f, 0xb3, 0x1c, 0x8d, 0x1b, 0xa5, 0x5c, 0xe3, 0x56, 0x2a, 0x3d, 0x25, 0xd8, 0x09, 0x27,
+	0x42, 0x7d, 0x7e, 0xcf, 0xd9, 0x65, 0x83, 0xb8, 0x0d, 0x40, 0xcf, 0xa8, 0x13, 0x44, 0x62, 0xaf,
+	0xb7, 0xa5, 0xad, 0x12, 0x23, 0x5f, 0xe5, 0x40, 0x2e, 0x77, 0x15, 0x5a, 0x34, 0x31, 0x31, 0x26,
+	0x17, 0x5f, 0x6e, 0x70, 0xe1, 0xdf, 0xce, 0x6f, 0x29, 0x6f, 0x87, 0xa4, 0x49, 0xf3, 0x76, 0xf9,
+	0x59, 0x8e, 0x8c, 0x6e, 0x8f, 0x5d, 0xb9, 0x19, 0xa9, 0x21, 0x05, 0x77, 0xec, 0xb1, 0x8b, 0x3f,
+	0x07, 0x94, 0x41, 0x9c, 0xea, 0x9e, 0x3e, 0x91, 0x5b, 0x6d, 0x69, 0xab, 0x4e, 0x32, 0x04, 0x0e,
+	0x19, 0x18, 0x3f, 0x80, 0xa6, 0x70, 0x60, 0x67, 0xd4, 0xe3, 0xc2, 0x46, 0x1c, 0xb1, 0x11, 0x41,
+	0xdf, 0x45, 0x40, 0xfc, 0x35, 0xdc, 0xc8, 0x29, 0x56, 0x3b, 0x7e, 0xf1, 0x5c, 0xa3, 0x8e, 0xe1,
+	0x9a, 0xd4, 0x94, 0xaf, 0xb6, 0xa5, 0xad, 0xca, 0xcb, 0xf2, 0x7b, 0xdd, 0xf6, 0x29, 0xd9, 0xcc,
+	0xea, 0x7a, 0xf7, 0xc5, 0x73, 0x35, 0x42, 0x52, 0xfe, 0xa1, 0x00, 0xd7, 0x87, 0x96, 0x33, 0xb6,
+	0xe9, 0xac, 0xf9, 0xe7, 0xad, 0x52, 0xba, 0xb4, 0x55, 0xce, 0x19, 0x5b, 0x61, 0xb1, 0xb1, 0x4d,
+	0xf5, 0x73, 0xdb, 0xd5, 0x85, 0xf6, 0xd9, 0xa9, 0x28, 0x93, 0x9a, 0x80, 0x71, 0xad, 0x3f, 0x82,
+	0x06, 0xb3, 0x03, 0xdd, 0x60, 0xc6, 0xed, 0x86, 0x01, 0xf7, 0x49, 0xc9, 0x7e, 0xea, 0xc9, 0xd8,
+	0x20, 0x0c, 0x66, 0x74, 0x5d, 0x5e, 0xa0, 0xeb, 0xa5, 0x92, 0x5a, 0xbb, 0x8c, 0xa4, 0xfe, 0xbe,
+	0x08, 0xcd, 0xae, 0x3b, 0x99, 0xe8, 0x8e, 0xd9, 0x75, 0x1d, 0x87, 0x1a, 0x01, 0xd3, 0x92, 0x61,
+	0x5b, 0x6c, 0xdd, 0x58, 0x4b, 0x91, 0x8b, 0x68, 0x44, 0xd0, 0x58, 0x4b, 0x5f, 0x41, 0x4d, 0x0f,
+	0x83, 0x13, 0x6d, 0x42, 0x83, 0x13, 0xd7, 0xe4, 0xf2, 0x68, 0xee, 0xc8, 0x79, 0x51, 0x76, 0xc2,
+	0xe0, 0xe4, 0x80, 0x8f, 0x13, 0xd0, 0x93, 0x67, 0xbc, 0x05, 0x28, 0x33, 0x35, 0x72, 0x43, 0xe2,
+	0x8c, 0xa7, 0x58, 0xdc, 0x11, 0xdd, 0x82, 0x2a, 0xc7, 0x14, 0x6e, 0x8f, 0x19, 0x4b, 0x85, 0x01,
+	0x78, 0xd4, 0xf8, 0x02, 0x10, 0x5f, 0xc6, 0x70, 0xed, 0x84, 0xd5, 0xc8, 0xc5, 0x4b, 0x4f, 0x48,
+	0x2b, 0x1e, 0x8a, 0xf9, 0x7d, 0x0c, 0xd7, 0xa6, 0x9e, 0xfb, 0xe1, 0x5c, 0x0b, 0x5c, 0xed, 0xd8,
+	0x73, 0x4f, 0xa9, 0xa7, 0x85, 0x9e, 0x2d, 0x9c, 0x06, 0xe2, 0x43, 0x23, 0x77, 0x97, 0x0f, 0x1c,
+	0x79, 0x36, 0x7e, 0x0c, 0xd8, 0xf5, 0xac, 0xb1, 0xe5, 0xe8, 0xb6, 0x36, 0xf5, 0x2c, 0xc7, 0xb0,
+	0xa6, 0xba, 0x2d, 0xaf, 0x73, 0xec, 0xab, 0xf1, 0xc8, 0x61, 0x3c, 0x80, 0xbf, 0xc8, 0xa0, 0xa7,
+	0x1c, 0x57, 0x22, 0xe2, 0xf1, 0x48, 0x27, 0xe6, 0xfc, 0x09, 0x6c, 0xe4, 0xb1, 0x85, 0x10, 0xab,
+	0x1c, 0x1f, 0x67, 0xf1, 0x23, 0x61, 0x28, 0x63, 0x40, 0x79, 0x35, 0x51, 0x93, 0x1f, 0x27, 0xea,
+	0x9d, 0x51, 0x6f, 0x56, 0x51, 0x11, 0x34, 0xde, 0xf8, 0x22, 0x31, 0x15, 0x2e, 0x12, 0x93, 0xf2,
+	0x2f, 0xe5, 0x64, 0xa5, 0x61, 0x78, 0xec, 0x1b, 0x9e, 0x75, 0x4c, 0x59, 0x48, 0x0a, 0xdc, 0xa9,
+	0x65, 0x88, 0x05, 0xa2, 0x17, 0xac, 0x40, 0xdd, 0x8f, 0x50, 0xf8, 0x19, 0x17, 0x11, 0x32, 0x07,
+	0xc3, 0x5f, 0xc3, 0xba, 0x1f, 0x1e, 0x33, 0x9f, 0xc9, 0x4f, 0x43, 0x73, 0xe7, 0xe1, 0x9c, 0x63,
+	0xcd, 0x2d, 0xb5, 0x3d, 0x8c, 0xb0, 0x49, 0x3c, 0x8d, 0xc5, 0x22, 0xc3, 0x75, 0xfc, 0x70, 0x42,
+	0x3d, 0x16, 0x8b, 0x4a, 0x51, 0x2c, 0x8a, 0x41, 0x3d, 0x13, 0x7f, 0x0a, 0xe0, 0xb1, 0xc8, 0xe4,
+	0x07, 0x6c, 0xbc, 0xcc, 0xc7, 0xab, 0x02, 0xd2, 0x33, 0xd9, 0xc9, 0x4d, 0xe6, 0x73, 0x4b, 0x13,
+	0x61, 0x22, 0x06, 0x72, 0x3b, 0x7b, 0x00, 0xcd, 0xa9, 0x67, 0xb9, 0x9e, 0x15, 0x9c, 0x6b, 0x36,
+	0x3d, 0xa3, 0x91, 0xa6, 0xcb, 0xa4, 0x11, 0x43, 0xf7, 0x19, 0x10, 0xdf, 0x81, 0x75, 0x33, 0xf4,
+	0xf4, 0x63, 0x9b, 0x72, 0xd5, 0x56, 0x5e, 0x96, 0x02, 0x2f, 0xa4, 0x24, 0x06, 0x62, 0x15, 0x90,
+	0x1f, 0xe8, 0x5e, 0x10, 0x7b, 0x75, 0xc6, 0x10, 0xd3, 0x69, 0x6d, 0xe7, 0x56, 0x7e, 0xdb, 0xb9,
+	0xf4, 0x87, 0x34, 0xf9, 0xa4, 0x04, 0x96, 0x8b, 0xf5, 0x70, 0xb9, 0x58, 0xcf, 0x76, 0xe0, 0x51,
+	0xdd, 0xd4, 0x12, 0x0f, 0xc2, 0xe3, 0x48, 0x85, 0x34, 0x18, 0xb4, 0x1b, 0x03, 0xf1, 0x17, 0xb0,
+	0x16, 0x39, 0x5b, 0x1e, 0x3b, 0x6a, 0x3b, 0x1b, 0x8b, 0x92, 0x44, 0x22, 0x70, 0xf0, 0x6f, 0xa0,
+	0x65, 0x39, 0x56, 0x60, 0xe9, 0xf6, 0xa1, 0xeb, 0x47, 0x79, 0x56, 0x83, 0x9f, 0xf3, 0xed, 0x15,
+	0x5a, 0xec, 0xe5, 0x67, 0xbd, 0x5c, 0xdb, 0xd7, 0x03, 0xea, 0x07, 0x64, 0x96, 0x9c, 0xb2, 0x03,
+	0xeb, 0x42, 0xe3, 0xb8, 0x01, 0x55, 0xf5, 0x83, 0x61, 0x87, 0xbe, 0x75, 0x16, 0xe7, 0x94, 0x27,
+	0xba, 0x47, 0x4d, 0x24, 0xb1, 0x4c, 0xf2, 0x95, 0x6e, 0xd9, 0xee, 0x19, 0xf5, 0x50, 0x41, 0xf9,
+	0x31, 0xb4, 0x66, 0xe8, 0x33, 0xe4, 0x68, 0x05, 0x74, 0x85, 0x21, 0xab, 0xba, 0x67, 0x5b, 0xec,
+	0x4d, 0x52, 0xfe, 0x53, 0x82, 0xbb, 0x82, 0xbd, 0xc3, 0xd8, 0x05, 0x52, 0x73, 0xc4, 0x0c, 0x38,
+	0x09, 0x0a, 0x8b, 0xcd, 0x3b, 0x6f, 0x57, 0x85, 0x59, 0xbb, 0x5a, 0xec, 0x20, 0x8a, 0x1f, 0xe7,
+	0x20, 0x4a, 0x1f, 0xe9, 0x20, 0xca, 0x17, 0x3a, 0x88, 0x7f, 0x2a, 0xc0, 0x67, 0x2b, 0xf6, 0x49,
+	0xa8, 0x3f, 0x75, 0x1d, 0x9f, 0xe2, 0x3b, 0x00, 0x49, 0x38, 0x60, 0x41, 0x50, 0xda, 0x6a, 0x90,
+	0x0c, 0x64, 0xd5, 0xce, 0x7f, 0x05, 0x15, 0x4f, 0x90, 0xe2, 0xfb, 0x6d, 0xee, 0x7c, 0xbd, 0xd0,
+	0x1c, 0x56, 0xf1, 0xb1, 0xbd, 0xef, 0xba, 0xa7, 0xe1, 0x94, 0x1f, 0xf7, 0x84, 0x22, 0xfe, 0x3d,
+	0x28, 0x53, 0xcf, 0x73, 0x3d, 0x2e, 0x9b, 0xf9, 0x2a, 0x86, 0xbb, 0x36, 0x95, 0x21, 0x90, 0x08,
+	0x8f, 0x15, 0x08, 0xe2, 0xb8, 0x09, 0xf1, 0xc4, 0xaf, 0xca, 0x03, 0x80, 0x74, 0x09, 0x5c, 0x63,
+	0xa6, 0x66, 0x18, 0xd4, 0xf7, 0x23, 0xeb, 0x62, 0x16, 0xc5, 0xac, 0x4b, 0xf9, 0xbe, 0x00, 0x58,
+	0xb0, 0x2c, 0xd0, 0xb9, 0xfe, 0x7f, 0x2b, 0xab, 0xf8, 0x31, 0x34, 0x98, 0xbe, 0x98, 0xcf, 0xd0,
+	0x03, 0xeb, 0x2c, 0x12, 0x50, 0x12, 0x85, 0xf3, 0x63, 0x17, 0x98, 0x50, 0xe9, 0xe3, 0x4c, 0xa8,
+	0xfc, 0x91, 0x26, 0xb4, 0x76, 0xa1, 0x09, 0xfd, 0x5b, 0x11, 0x6e, 0xce, 0xcb, 0x21, 0xb1, 0x9a,
+	0x47, 0x80, 0xa2, 0xb8, 0xc9, 0x74, 0x60, 0x19, 0xf4, 0xc8, 0xb3, 0xb9, 0xed, 0x54, 0xc9, 0x1c,
+	0x1c, 0x3f, 0x81, 0x6b, 0xb3, 0xb0, 0x91, 0xed, 0x8b, 0xa4, 0x69, 0xd1, 0x10, 0x1e, 0xcc, 0x19,
+	0xd5, 0xb3, 0x85, 0x46, 0xb5, 0x80, 0xb3, 0xc5, 0x76, 0x94, 0x57, 0x54, 0x69, 0xa5, 0xa2, 0xca,
+	0x4b, 0x14, 0x95, 0xd8, 0xe4, 0xda, 0xc7, 0xdb, 0xe4, 0x7a, 0xce, 0x26, 0x79, 0xca, 0x16, 0xa5,
+	0x21, 0x27, 0x9e, 0x1b, 0x8e, 0x4f, 0x34, 0x3f, 0x12, 0x03, 0x4f, 0x46, 0x2a, 0xf9, 0x94, 0x8d,
+	0xe7, 0x24, 0x11, 0x5a, 0x2a, 0x2c, 0xe5, 0x59, 0xce, 0xaa, 0xeb, 0x50, 0x21, 0xd4, 0xb4, 0x3c,
+	0x6a, 0x30, 0xdf, 0x57, 0x83, 0x75, 0x91, 0x1f, 0x20, 0x29, 0x63, 0xe3, 0x05, 0xe5, 0xaf, 0x0b,
+	0xd0, 0x8a, 0x8f, 0xa5, 0xa8, 0xf4, 0x2e, 0x30, 0xf0, 0xbb, 0x50, 0x4b, 0x0a, 0xc4, 0xb4, 0xf6,
+	0x8b, 0x41, 0x73, 0xf1, 0xb6, 0xb8, 0x20, 0xde, 0xe6, 0x0b, 0xcc, 0x92, 0xc8, 0x94, 0xb3, 0x05,
+	0xe6, 0x7d, 0xa8, 0x8a, 0xe2, 0x80, 0x9a, 0x79, 0xc9, 0xa7, 0xf0, 0x5c, 0x18, 0x5c, 0xbb, 0x64,
+	0x18, 0x4c, 0xe3, 0xdb, 0xfa, 0xea, 0xf8, 0xa6, 0x84, 0x50, 0x8b, 0x43, 0x17, 0x75, 0xcc, 0xd9,
+	0xad, 0x4b, 0x73, 0x5b, 0x5f, 0x59, 0x17, 0xff, 0x08, 0xea, 0xd9, 0xa2, 0x4e, 0x74, 0x25, 0xa4,
+	0xa7, 0xa4, 0x96, 0xa9, 0xe5, 0x94, 0xbf, 0x92, 0x12, 0x87, 0xc3, 0xd6, 0x25, 0xd4, 0xa0, 0xd6,
+	0x34, 0xf8, 0x1d, 0x2c, 0xff, 0x12, 0x20, 0x93, 0x79, 0x14, 0x57, 0x67, 0x1e, 0xd5, 0x49, 0xfc,
+	0xaa, 0xfc, 0xad, 0x94, 0x26, 0x7e, 0xd4, 0x31, 0xb9, 0x39, 0xff, 0x0e, 0x58, 0x4a, 0x8e, 0x4e,
+	0x71, 0x61, 0x53, 0x6a, 0xe9, 0xd1, 0x29, 0x71, 0xbb, 0x4c, 0xdc, 0xf9, 0xdf, 0x48, 0x49, 0xad,
+	0x22, 0x76, 0x31, 0x9b, 0x1c, 0x4a, 0x73, 0xc9, 0x61, 0x5e, 0x22, 0x8c, 0xbd, 0x4b, 0x4b, 0x84,
+	0x25, 0xce, 0x1e, 0x35, 0xa9, 0x6d, 0x9d, 0x51, 0xef, 0x5c, 0x33, 0xdc, 0xd0, 0x09, 0xb8, 0x4c,
+	0x79, 0x41, 0xdf, 0x4a, 0x87, 0xba, 0x6c, 0x44, 0xf9, 0x9f, 0x22, 0x80, 0xe0, 0xae, 0x63, 0x9c,
+	0xae, 0xe6, 0xec, 0xa7, 0x50, 0xd1, 0x8d, 0x53, 0x8d, 0x37, 0xec, 0x0a, 0x5c, 0x36, 0xed, 0x85,
+	0x0e, 0xaf, 0x63, 0x9c, 0x6e, 0x77, 0x8c, 0xd3, 0x28, 0x29, 0xd6, 0xa3, 0x87, 0x39, 0x45, 0x17,
+	0x3f, 0x62, 0x5b, 0x43, 0x40, 0x67, 0xba, 0x6d, 0x99, 0x3a, 0xaf, 0x1a, 0xb3, 0xb1, 0x76, 0xeb,
+	0x42, 0x06, 0xde, 0x25, 0x13, 0x22, 0x5d, 0xb5, 0xce, 0xf2, 0x00, 0xc6, 0xd0, 0x5c, 0x2f, 0xf1,
+	0xe6, 0xdc, 0x69, 0x4d, 0x1a, 0x66, 0xb9, 0x7e, 0xe2, 0xe7, 0xb0, 0x2e, 0x36, 0x88, 0x9b, 0x00,
+	0x3d, 0xc7, 0xb4, 0xce, 0x2c, 0x33, 0xd4, 0x6d, 0x74, 0x85, 0xbd, 0x77, 0xc3, 0x49, 0x68, 0x73,
+	0x37, 0x8c, 0x24, 0xe5, 0x2f, 0x25, 0x68, 0xcd, 0xf0, 0x82, 0xef, 0xc0, 0xcd, 0xa3, 0x99, 0xe6,
+	0x4a, 0xd7, 0xf5, 0xbc, 0x90, 0x17, 0x20, 0xe8, 0x0a, 0xde, 0x04, 0xbc, 0x47, 0x33, 0x9d, 0x1a,
+	0x3e, 0x0b, 0x49, 0x78, 0x03, 0x50, 0xf7, 0x84, 0x1a, 0xa7, 0x7e, 0x38, 0x39, 0xb0, 0xfc, 0x89,
+	0x1e, 0x18, 0x27, 0xa8, 0x80, 0x6f, 0xc0, 0x75, 0xde, 0x69, 0xd9, 0xa3, 0x43, 0xea, 0x59, 0xba,
+	0x6d, 0x7d, 0x47, 0xa3, 0x09, 0x45, 0x7c, 0x0d, 0x5a, 0x7b, 0x34, 0xee, 0x68, 0x44, 0xc0, 0x92,
+	0x72, 0x0c, 0xb7, 0x12, 0x39, 0x31, 0x26, 0xbb, 0x42, 0xc3, 0xdd, 0x13, 0xdd, 0xb9, 0x8c, 0x81,
+	0x2a, 0x50, 0xb5, 0x7c, 0x4d, 0xe7, 0x73, 0x79, 0x7c, 0x4c, 0x3c, 0x61, 0xc5, 0xf2, 0x23, 0x92,
+	0xca, 0xbb, 0xc4, 0x4d, 0xbd, 0xb2, 0xdd, 0x6f, 0x57, 0xd3, 0x7c, 0x08, 0x4d, 0xa1, 0xee, 0x43,
+	0xea, 0x4d, 0xac, 0xc0, 0xe7, 0x06, 0xd6, 0x20, 0x33, 0x50, 0x65, 0x94, 0xb8, 0xa1, 0x23, 0xc7,
+	0x4f, 0x8a, 0xbd, 0x95, 0xe4, 0x97, 0xa7, 0x40, 0xca, 0x9f, 0x4b, 0x19, 0xaf, 0x4a, 0x4f, 0x7f,
+	0x28, 0xbd, 0x1f, 0xe4, 0xd4, 0x7e, 0x0a, 0xb2, 0x60, 0x85, 0x50, 0xdd, 0x38, 0xa1, 0xa6, 0xea,
+	0x98, 0x83, 0xf7, 0xa3, 0x38, 0xd0, 0x2d, 0xe5, 0x4b, 0x79, 0x07, 0x1b, 0x71, 0xcd, 0x6d, 0xbb,
+	0x3e, 0x4d, 0xe2, 0xe6, 0x4a, 0xa7, 0xb8, 0x42, 0x40, 0x33, 0x74, 0x63, 0x8b, 0xf9, 0xc1, 0x82,
+	0xff, 0x33, 0x09, 0x1e, 0x26, 0xbb, 0x15, 0xce, 0xe9, 0xc8, 0xd1, 0x8d, 0x53, 0xc7, 0xfd, 0x96,
+	0xb7, 0xd3, 0x63, 0xb7, 0xe9, 0xaf, 0x5e, 0xea, 0x67, 0x50, 0x4b, 0x85, 0xce, 0xec, 0x67, 0xa5,
+	0x87, 0x81, 0x44, 0xea, 0xbe, 0xf2, 0xeb, 0xc4, 0x51, 0x8b, 0x8c, 0x7b, 0x86, 0x75, 0x69, 0x56,
+	0xc7, 0x69, 0xd8, 0x2e, 0x5c, 0x22, 0x6c, 0xff, 0xa3, 0x04, 0x9b, 0x33, 0xc9, 0xcc, 0x25, 0xd7,
+	0x99, 0x4b, 0x4e, 0x0a, 0x0b, 0xba, 0xdf, 0x5f, 0x00, 0xb2, 0x75, 0x3f, 0xd0, 0xb2, 0x81, 0x8d,
+	0x99, 0x5d, 0x91, 0x5f, 0x1d, 0x34, 0xd9, 0xd8, 0x30, 0x0d, 0x70, 0xf3, 0x4d, 0xcd, 0xd2, 0x82,
+	0xa6, 0xa6, 0xf2, 0x01, 0xea, 0x82, 0xe5, 0xc8, 0x6b, 0xad, 0x60, 0x34, 0x09, 0x9b, 0x85, 0x8f,
+	0x0f, 0x9b, 0xc5, 0x7c, 0xd8, 0x6c, 0x24, 0xc7, 0xf1, 0xd0, 0x72, 0xc6, 0xd9, 0x57, 0xd7, 0x19,
+	0x67, 0x8d, 0x51, 0x68, 0x7f, 0x18, 0xe8, 0xc1, 0x4a, 0x41, 0xae, 0xea, 0xca, 0x28, 0xff, 0x5d,
+	0x82, 0xdb, 0x8b, 0x08, 0x93, 0xc5, 0xf9, 0xf9, 0xdc, 0x02, 0x5f, 0x02, 0xf0, 0x8d, 0x69, 0x86,
+	0x6b, 0x52, 0xd1, 0x5d, 0x5c, 0x22, 0x85, 0x2a, 0x47, 0xee, 0xba, 0x26, 0xcb, 0x2d, 0x1b, 0xd1,
+	0xcc, 0x54, 0x1e, 0x3c, 0x01, 0xe5, 0xc0, 0x38, 0x71, 0xb8, 0x03, 0x30, 0xf1, 0xc7, 0x44, 0x0f,
+	0xe8, 0x40, 0x34, 0x61, 0x25, 0x92, 0x81, 0xb0, 0x62, 0x67, 0xe2, 0x8f, 0x45, 0xf2, 0x3d, 0x0d,
+	0x03, 0x86, 0x55, 0xe6, 0x58, 0x73, 0x70, 0x81, 0xcb, 0x66, 0x26, 0xc7, 0x8e, 0x17, 0x0a, 0x11,
+	0x6e, 0x0e, 0x8e, 0x15, 0xc8, 0x35, 0x9e, 0x44, 0x75, 0x90, 0x6f, 0x46, 0x3d, 0x02, 0xa4, 0x9f,
+	0xe9, 0x96, 0xad, 0x1f, 0xdb, 0x89, 0x03, 0x67, 0x95, 0x41, 0x89, 0xcc, 0xc1, 0xf1, 0x16, 0xb4,
+	0x42, 0x76, 0xc4, 0xd3, 0xb3, 0xcd, 0x1b, 0x4e, 0x25, 0x32, 0x0b, 0xc6, 0xbb, 0x70, 0xfb, 0xd8,
+	0x76, 0x19, 0x28, 0xd6, 0xc7, 0xc0, 0x39, 0x12, 0x38, 0xfe, 0xd8, 0x97, 0x81, 0xb7, 0x8b, 0x96,
+	0xe2, 0x30, 0x23, 0xd3, 0x4d, 0x93, 0xc5, 0x51, 0xde, 0x5d, 0xaa, 0x92, 0xf8, 0x95, 0x85, 0x1c,
+	0x23, 0x6e, 0x4c, 0x0e, 0x2d, 0xc7, 0x88, 0xee, 0x26, 0xaa, 0x64, 0x06, 0x8a, 0xb1, 0xb8, 0xa2,
+	0x6c, 0xf0, 0xd1, 0xe8, 0x1e, 0x92, 0x85, 0xab, 0x48, 0x4e, 0xea, 0x87, 0xa9, 0xe5, 0x51, 0x93,
+	0xdf, 0x34, 0x48, 0x64, 0x06, 0x2a, 0x74, 0xb6, 0xab, 0x1b, 0xa7, 0xb6, 0x3b, 0xe6, 0x77, 0x0c,
+	0x25, 0x92, 0x81, 0x28, 0xbf, 0x84, 0x4f, 0x84, 0xc5, 0xbd, 0xa6, 0xc1, 0xbe, 0xee, 0x67, 0x3a,
+	0x6a, 0x3f, 0xd4, 0xb5, 0x7e, 0x9f, 0x76, 0x91, 0x66, 0x69, 0x27, 0x06, 0xdd, 0x85, 0x16, 0x77,
+	0x1b, 0x99, 0x60, 0x25, 0xad, 0xce, 0x37, 0x1b, 0x76, 0x8e, 0xd1, 0x15, 0x7c, 0xfc, 0xbb, 0x94,
+	0xa4, 0x1b, 0xaf, 0x69, 0xc0, 0xe3, 0x98, 0x3f, 0x78, 0xcf, 0xac, 0xc6, 0x9f, 0xea, 0xc6, 0xca,
+	0x43, 0x75, 0x1b, 0xaa, 0x4e, 0x8c, 0x2b, 0x5c, 0x5f, 0x0a, 0xc0, 0x7d, 0x28, 0x4d, 0xd8, 0x61,
+	0x2b, 0x2e, 0x69, 0xf1, 0x2d, 0x5a, 0x75, 0xfb, 0xc0, 0x35, 0xe9, 0x4b, 0x38, 0x54, 0xc9, 0xb0,
+	0x37, 0x1c, 0xa9, 0xfd, 0x11, 0xe1, 0x74, 0x94, 0x67, 0x50, 0x62, 0x23, 0x2c, 0x89, 0x4b, 0xc7,
+	0xd0, 0x15, 0x8c, 0xa1, 0xd9, 0x1f, 0xf4, 0xb5, 0x0c, 0x4c, 0xc2, 0xeb, 0x50, 0xec, 0xec, 0xef,
+	0xa3, 0x82, 0xf2, 0x2b, 0xb8, 0xbf, 0x64, 0xa9, 0xcb, 0x7a, 0x8f, 0x4d, 0x58, 0xe3, 0xd5, 0x6c,
+	0x14, 0xb9, 0xaa, 0x44, 0xbc, 0x29, 0x4e, 0x52, 0xe3, 0xbc, 0xa6, 0x81, 0xb8, 0x6a, 0x5f, 0x41,
+	0x2a, 0xa9, 0x92, 0x0b, 0xd9, 0x2a, 0x79, 0xde, 0xeb, 0x17, 0x17, 0x79, 0xfd, 0xff, 0x92, 0x92,
+	0x04, 0x24, 0x59, 0xf0, 0xff, 0x89, 0x07, 0x4c, 0x43, 0x6e, 0xe9, 0x12, 0x9d, 0xe0, 0xf9, 0xfd,
+	0x96, 0x17, 0xed, 0xf7, 0x9f, 0x6f, 0x40, 0x6d, 0x57, 0x67, 0x39, 0x0d, 0xdf, 0x33, 0xde, 0x11,
+	0xc7, 0x5d, 0xe2, 0x51, 0xec, 0x4e, 0x7e, 0x89, 0x0c, 0x62, 0xfe, 0xb3, 0x84, 0x75, 0xe1, 0x34,
+	0x44, 0x32, 0x70, 0x7b, 0xa1, 0x25, 0x8a, 0x3e, 0x07, 0x89, 0x91, 0xf1, 0xcf, 0xa0, 0x9a, 0x38,
+	0x1b, 0x91, 0x26, 0xde, 0x59, 0x36, 0x93, 0x9a, 0x24, 0x9d, 0xc0, 0x66, 0x27, 0x29, 0xb0, 0x90,
+	0xc8, 0x9d, 0xe5, 0x4d, 0x6e, 0x92, 0x4e, 0xc0, 0x5f, 0x41, 0x25, 0x4e, 0x21, 0xb8, 0x60, 0x6a,
+	0x0b, 0x2e, 0x90, 0xb3, 0xe9, 0x0a, 0x49, 0xd0, 0xf1, 0x63, 0x28, 0xf9, 0xd4, 0x89, 0xfa, 0x72,
+	0xb5, 0x59, 0x05, 0x67, 0xbb, 0x04, 0x1c, 0x0d, 0x77, 0xa1, 0xce, 0x7e, 0x35, 0x2f, 0x6a, 0x1a,
+	0x88, 0x36, 0x47, 0xfb, 0xe2, 0x69, 0x11, 0x1e, 0xa9, 0xf9, 0x99, 0x4e, 0xc3, 0x1f, 0x00, 0x70,
+	0x22, 0x51, 0x8a, 0x51, 0x59, 0xb6, 0xdb, 0xb8, 0x15, 0x40, 0xaa, 0x7e, 0xd2, 0x15, 0x78, 0x91,
+	0xe6, 0x1a, 0xd5, 0x25, 0x1a, 0x12, 0x96, 0x96, 0xf6, 0xbe, 0x1e, 0x41, 0x51, 0x37, 0x4e, 0x79,
+	0xa4, 0xa9, 0xcd, 0x5e, 0x15, 0xa6, 0xc5, 0x26, 0x61, 0x48, 0x4c, 0x2c, 0xef, 0x6d, 0xf7, 0x5b,
+	0x1e, 0x67, 0x2e, 0x12, 0x0b, 0xab, 0x86, 0x08, 0x47, 0xc3, 0xbb, 0x50, 0x0b, 0xd3, 0x1a, 0x46,
+	0x5c, 0x6e, 0x2c, 0x96, 0x4a, 0xa6, 0xd6, 0x21, 0xd9, 0x49, 0x6c, 0x5b, 0x7e, 0x94, 0x46, 0xf2,
+	0xf0, 0x74, 0xd1, 0xb6, 0x44, 0xaa, 0x49, 0x62, 0x64, 0xfc, 0x24, 0xce, 0xd5, 0x9a, 0x7c, 0xd6,
+	0xcd, 0x85, 0xb3, 0x72, 0xc9, 0x5a, 0x0f, 0x9a, 0x06, 0x4b, 0xfd, 0xb5, 0xc4, 0x68, 0x5a, 0x7c,
+	0xaa, 0xb2, 0xd8, 0x5e, 0xb3, 0xd5, 0x07, 0x69, 0x18, 0xb9, 0x62, 0x24, 0x21, 0x15, 0x07, 0x33,
+	0x7e, 0xa7, 0xbe, 0x94, 0x54, 0x1c, 0xdb, 0x05, 0xa9, 0xa4, 0xfe, 0x18, 0xf0, 0x8b, 0xc2, 0x28,
+	0x39, 0x8e, 0x05, 0x71, 0x95, 0x13, 0xfb, 0xd1, 0x52, 0x63, 0x8e, 0x05, 0xd2, 0x9a, 0xce, 0x24,
+	0xe3, 0x8f, 0xa1, 0x34, 0xb5, 0x9c, 0xb1, 0x8c, 0x97, 0xe8, 0x90, 0xe5, 0xa4, 0x84, 0xa3, 0x71,
+	0x74, 0xd7, 0x19, 0xcb, 0xd7, 0x96, 0xa1, 0xbb, 0x1c, 0xdd, 0x75, 0xc6, 0xf8, 0x4f, 0xe1, 0xae,
+	0xb7, 0xbc, 0xcc, 0x91, 0x37, 0x38, 0xa5, 0xe7, 0x0b, 0x29, 0xad, 0x28, 0x91, 0xc8, 0x2a, 0xe2,
+	0xf8, 0x8f, 0xe1, 0x6a, 0x72, 0x67, 0x12, 0x5f, 0x6d, 0xc8, 0xd7, 0xf9, 0x8a, 0x8f, 0x3f, 0xee,
+	0x3e, 0x64, 0x9e, 0x0e, 0xf6, 0x33, 0x37, 0xfb, 0xb3, 0xf7, 0x26, 0xf2, 0x26, 0x5f, 0xe4, 0x27,
+	0xbf, 0xd5, 0xa5, 0x0b, 0xb9, 0x98, 0x2e, 0x3b, 0x44, 0x76, 0xda, 0x5e, 0x97, 0x3f, 0x59, 0x72,
+	0x88, 0xb2, 0x6d, 0xf8, 0xec, 0x24, 0xfc, 0x0d, 0x5c, 0xb3, 0xe7, 0x5b, 0xf4, 0xb2, 0xcc, 0x69,
+	0x6d, 0x5d, 0xb6, 0xa5, 0x4f, 0x16, 0x11, 0xc1, 0x6f, 0xd2, 0xab, 0x5c, 0x5e, 0x4b, 0xc8, 0x37,
+	0x96, 0x99, 0x7a, 0xae, 0xea, 0xc8, 0x4f, 0xc4, 0xbf, 0x81, 0xeb, 0xc6, 0xa2, 0xaa, 0x44, 0xbe,
+	0xc9, 0x29, 0x3e, 0xba, 0x04, 0xc5, 0x98, 0xd3, 0xc5, 0x84, 0xf0, 0x08, 0xae, 0x7a, 0xb3, 0x2d,
+	0x07, 0xf9, 0x16, 0xa7, 0xfe, 0xf0, 0x02, 0x7b, 0x9c, 0xc1, 0x26, 0xf3, 0x04, 0xa2, 0x60, 0x41,
+	0x4f, 0xe5, 0xdb, 0x4b, 0x83, 0x05, 0x3d, 0x25, 0x1c, 0x0d, 0xff, 0x1c, 0xd0, 0x78, 0x26, 0x5d,
+	0x95, 0x3f, 0xe5, 0x53, 0x1f, 0x5c, 0x94, 0xdd, 0xe5, 0x73, 0xdb, 0xb9, 0xe9, 0xd8, 0x02, 0x79,
+	0x7c, 0x41, 0x06, 0x2c, 0xdf, 0x59, 0x62, 0xfc, 0x17, 0xa5, 0xcd, 0xe4, 0x42, 0x72, 0x58, 0x83,
+	0xcd, 0xa8, 0x2f, 0x96, 0xf8, 0x36, 0xcd, 0xe0, 0x5d, 0x35, 0xf9, 0x2e, 0x5f, 0xe8, 0xf3, 0x0b,
+	0x22, 0xc8, 0x7c, 0x1b, 0x8e, 0x6c, 0xe8, 0x8b, 0x9a, 0x73, 0xbf, 0x86, 0x8d, 0xf1, 0x82, 0x24,
+	0x53, 0x6e, 0x2f, 0x21, 0xbf, 0x30, 0x2b, 0x5d, 0x48, 0x06, 0x87, 0x70, 0x7b, 0xbc, 0x24, 0x87,
+	0x95, 0xef, 0xf1, 0x65, 0x9e, 0x5e, 0x7e, 0x99, 0x58, 0x64, 0x4b, 0xc9, 0xb2, 0x4c, 0x66, 0x1c,
+	0xe7, 0x9a, 0xb2, 0xb2, 0x24, 0xb6, 0xa7, 0x19, 0x69, 0x3a, 0x81, 0xd9, 0xed, 0x78, 0x36, 0x53,
+	0x95, 0xef, 0x2f, 0xb1, 0xdb, 0xb9, 0xbc, 0x96, 0xcc, 0x13, 0x50, 0xfe, 0xae, 0x2c, 0xbe, 0x19,
+	0xad, 0xc1, 0x7a, 0x77, 0xd0, 0xef, 0xab, 0xdd, 0x11, 0x2a, 0xe0, 0x06, 0x54, 0xc5, 0x8b, 0xba,
+	0x87, 0x8a, 0xec, 0x75, 0x78, 0xb4, 0x3b, 0xec, 0x92, 0xde, 0xae, 0x8a, 0x4a, 0xfc, 0xf3, 0x51,
+	0x32, 0xd8, 0x3b, 0xea, 0xaa, 0x04, 0x95, 0x71, 0x05, 0x4a, 0x43, 0xb5, 0xbf, 0x87, 0xd6, 0x30,
+	0x82, 0x3a, 0x7b, 0xd2, 0x88, 0xda, 0x55, 0x7b, 0x87, 0x23, 0xb4, 0xce, 0x0a, 0x0c, 0x0e, 0x51,
+	0x09, 0x19, 0x10, 0x54, 0x61, 0x8b, 0x1c, 0xa8, 0xc3, 0x61, 0xe7, 0xb5, 0x8a, 0xaa, 0xbc, 0xb2,
+	0xe8, 0xbe, 0x45, 0xc0, 0x28, 0xbc, 0xda, 0x1f, 0xfc, 0x02, 0xd5, 0x70, 0x0b, 0x6a, 0x47, 0xfd,
+	0x74, 0xa9, 0x3a, 0xbf, 0x1a, 0x3e, 0xea, 0x76, 0xd5, 0xe1, 0x10, 0x35, 0x70, 0x15, 0xca, 0x11,
+	0xa1, 0x26, 0xab, 0x54, 0xba, 0xfb, 0x83, 0xa1, 0xaa, 0x25, 0x8c, 0xb4, 0x52, 0x58, 0x77, 0xd0,
+	0x1f, 0x1e, 0x1d, 0xa8, 0x04, 0x21, 0xbc, 0x01, 0x28, 0xc6, 0xd0, 0x62, 0x42, 0x57, 0xd9, 0x82,
+	0x87, 0xbd, 0xfe, 0x6b, 0x84, 0xf9, 0xd3, 0xa0, 0xff, 0x1a, 0x5d, 0xc3, 0x0f, 0xe0, 0x1e, 0x51,
+	0xf7, 0xd4, 0xfd, 0xde, 0x3b, 0x95, 0x68, 0x47, 0xfd, 0x4e, 0xf7, 0x6d, 0x7f, 0xf0, 0x8b, 0x7d,
+	0x75, 0xef, 0xb5, 0xba, 0xa7, 0x09, 0x9e, 0x87, 0x68, 0x03, 0xcb, 0xb0, 0x71, 0xd8, 0x21, 0xa3,
+	0xde, 0xa8, 0x37, 0xe8, 0xf3, 0x91, 0x51, 0x67, 0xaf, 0x33, 0xea, 0xa0, 0xeb, 0xf8, 0x1e, 0x7c,
+	0xba, 0x68, 0x44, 0x23, 0xea, 0xf0, 0x70, 0xd0, 0x1f, 0xaa, 0x68, 0x93, 0x7f, 0x0c, 0x31, 0x18,
+	0xbc, 0x3d, 0x3a, 0x44, 0x9f, 0xe0, 0x6b, 0xd0, 0x8a, 0x9e, 0x53, 0x04, 0x99, 0x6f, 0x41, 0x30,
+	0xaf, 0x0d, 0x47, 0x9d, 0xd1, 0x10, 0xdd, 0xc0, 0xb7, 0xe0, 0x93, 0x3c, 0x2c, 0x9d, 0x70, 0x93,
+	0xb1, 0x43, 0xd4, 0x4e, 0xf7, 0x8d, 0xba, 0xa7, 0x31, 0x39, 0x0f, 0x5e, 0x69, 0xa3, 0xc1, 0x61,
+	0xaf, 0x8b, 0x6e, 0x45, 0x6a, 0x51, 0xdf, 0xa2, 0xdb, 0xf8, 0x13, 0xb8, 0xf6, 0x5a, 0x1d, 0x69,
+	0xfb, 0x9d, 0xe1, 0x28, 0xde, 0x89, 0xd6, 0xdb, 0x43, 0x9f, 0xe2, 0x36, 0xdc, 0x5e, 0x30, 0x90,
+	0x92, 0xbf, 0x83, 0x6f, 0xc2, 0x66, 0xa7, 0x3b, 0xea, 0xbd, 0x4b, 0x65, 0xaa, 0x75, 0xdf, 0x74,
+	0xfa, 0xaf, 0x55, 0x74, 0x97, 0xf1, 0xc5, 0x66, 0xf3, 0xf5, 0x86, 0x6c, 0xe5, 0x7e, 0xe7, 0x40,
+	0x1d, 0x1e, 0x76, 0xba, 0x2a, 0x6a, 0xe3, 0x1f, 0x41, 0xfb, 0x82, 0xc1, 0x94, 0xfc, 0x3d, 0x66,
+	0x1e, 0x0c, 0x6b, 0xd8, 0x7d, 0xa3, 0x1e, 0x74, 0x90, 0x12, 0x73, 0x1a, 0xbd, 0xa7, 0x88, 0xf7,
+	0x1f, 0x7d, 0xc9, 0xef, 0x48, 0xb3, 0x1f, 0x7a, 0xf2, 0x6f, 0x9c, 0x07, 0x7d, 0x15, 0x5d, 0x61,
+	0x76, 0xb4, 0xff, 0xcd, 0xf3, 0xe8, 0x03, 0xe7, 0x6f, 0xf6, 0x7b, 0xbb, 0xa8, 0xc0, 0x9f, 0x86,
+	0xa3, 0x3d, 0x54, 0x7c, 0xf4, 0xaf, 0x45, 0xa8, 0x65, 0x8a, 0x31, 0x66, 0xa3, 0x47, 0x0e, 0xcb,
+	0x19, 0xc4, 0x3d, 0xc1, 0x15, 0x7c, 0x15, 0x1a, 0x71, 0xbc, 0xcd, 0x5c, 0x40, 0x1c, 0xb2, 0xba,
+	0xc9, 0x0f, 0xa8, 0x63, 0x88, 0x5b, 0x86, 0x02, 0xe3, 0xae, 0x13, 0x06, 0x27, 0xd4, 0x09, 0x2c,
+	0x23, 0xbd, 0xe5, 0x40, 0x45, 0xbc, 0x09, 0xb8, 0x13, 0xdd, 0x4a, 0x7f, 0x97, 0x81, 0x97, 0xd8,
+	0x5a, 0xb1, 0x5f, 0xdb, 0x0d, 0xfd, 0x73, 0x54, 0x66, 0x4a, 0x17, 0xf7, 0xc5, 0x7d, 0x37, 0x20,
+	0x54, 0x37, 0xcf, 0xd1, 0x1a, 0xb3, 0xbc, 0x38, 0x61, 0xdb, 0x8d, 0x7a, 0x3c, 0x3f, 0x0f, 0xdd,
+	0x40, 0x57, 0x3f, 0x18, 0x94, 0x9a, 0x34, 0xca, 0x4f, 0xd1, 0x3a, 0xfe, 0x1c, 0x1e, 0x2c, 0x45,
+	0xfb, 0x60, 0xd0, 0xe8, 0x62, 0xa5, 0xc2, 0xb6, 0x14, 0x5f, 0xa0, 0x44, 0xb3, 0xab, 0x4c, 0x5b,
+	0x2c, 0xbd, 0x9e, 0x4e, 0x5d, 0x2f, 0xa0, 0xa6, 0xa8, 0x0a, 0xa3, 0x41, 0x60, 0xf8, 0xdc, 0x6b,
+	0xf5, 0xdd, 0xe0, 0x95, 0x1b, 0x3a, 0x26, 0xaa, 0x31, 0xc3, 0x1a, 0x66, 0x3e, 0x17, 0x4b, 0x46,
+	0xea, 0xfc, 0x76, 0x26, 0x6e, 0x8a, 0xc5, 0xd0, 0x06, 0xdb, 0xd9, 0xc8, 0x75, 0x0f, 0x74, 0xe7,
+	0x9c, 0x44, 0x75, 0xb2, 0x8f, 0x9a, 0x8c, 0x08, 0xa7, 0x3b, 0xa2, 0xde, 0xc4, 0x72, 0xf4, 0x20,
+	0xde, 0x4c, 0x8b, 0x89, 0x26, 0xd9, 0x0c, 0x13, 0x0d, 0x3f, 0xa9, 0x3d, 0x87, 0x5f, 0x5e, 0x45,
+	0xac, 0xe8, 0x13, 0x8a, 0xae, 0x32, 0xd1, 0xf6, 0xf8, 0x15, 0x92, 0x1e, 0x58, 0xc7, 0x36, 0x8d,
+	0x9c, 0x17, 0xc2, 0x8f, 0xde, 0x02, 0xa4, 0xdf, 0x47, 0xb0, 0x63, 0x93, 0xbe, 0x89, 0x2f, 0xdf,
+	0xaf, 0x41, 0x2b, 0x85, 0xfd, 0xd2, 0xd0, 0xdf, 0x3d, 0x8d, 0x14, 0x9b, 0x02, 0x3b, 0x4c, 0x97,
+	0x3e, 0x2a, 0x3c, 0xfa, 0x5e, 0x82, 0xd6, 0xe1, 0xcc, 0x47, 0x89, 0x6b, 0x50, 0x38, 0x7b, 0x82,
+	0xae, 0xf0, 0x5f, 0x36, 0x93, 0xfd, 0xee, 0xa0, 0x02, 0xff, 0x7d, 0x86, 0x8a, 0xfc, 0xf7, 0x39,
+	0x2a, 0xf1, 0xdf, 0x9f, 0xa0, 0x32, 0xff, 0x7d, 0x81, 0xd6, 0xf8, 0xef, 0xef, 0xa3, 0x75, 0xfe,
+	0xfb, 0x25, 0xaa, 0xf0, 0xdf, 0xaf, 0x22, 0x67, 0x77, 0xf6, 0xf4, 0x09, 0x82, 0xe8, 0xe1, 0x29,
+	0xaa, 0x45, 0x0f, 0x3b, 0xa8, 0x1e, 0x3d, 0x3c, 0x43, 0x8d, 0xdd, 0x87, 0xa0, 0xb8, 0xde, 0x78,
+	0x5b, 0x9f, 0xb2, 0xe4, 0x22, 0x76, 0xe9, 0x86, 0x3b, 0x99, 0xb8, 0xce, 0xb6, 0x1e, 0xff, 0x33,
+	0xe1, 0x4d, 0xf1, 0xff, 0x02, 0x00, 0x00, 0xff, 0xff, 0x3f, 0x99, 0x3c, 0x38, 0xad, 0x30, 0x00,
+	0x00,
+}
diff --git a/pulsar/reader.go b/pulsar/reader.go
new file mode 100644
index 0000000..5592630
--- /dev/null
+++ b/pulsar/reader.go
@@ -0,0 +1,84 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package pulsar
+
+import "context"
+
+type ReaderMessage struct {
+	Reader
+	Message
+}
+
+type ReaderOptions struct {
+	// Specify the topic this consumer will subscribe on.
+	// This argument is required when constructing the reader.
+	Topic string
+
+	// Set the reader name.
+	Name string
+
+	// The initial reader positioning is done by specifying a message id. The options are:
+	//  * `pulsar.EarliestMessage` : Start reading from the earliest message available in the topic
+	//  * `pulsar.LatestMessage` : Start reading from the end topic, only getting messages published after the
+	//                           reader was created
+	//  * `MessageID` : Start reading from a particular message id, the reader will position itself on that
+	//                  specific position. The first message to be read will be the message next to the specified
+	//                  messageID
+	StartMessageID MessageID
+
+	// Sets a `MessageChannel` for the consumer
+	// When a message is received, it will be pushed to the channel for consumption
+	MessageChannel chan ReaderMessage
+
+	// Sets the size of the consumer receive queue.
+	// The consumer receive queue controls how many messages can be accumulated by the Reader before the
+	// application calls Reader.readNext(). Using a higher value could potentially increase the consumer
+	// throughput at the expense of bigger memory utilization.
+	//
+	// Default value is {@code 1000} messages and should be good for most use cases.
+	ReceiverQueueSize int
+
+	// Set the subscription role prefix. The default prefix is "reader".
+	SubscriptionRolePrefix string
+
+	// If enabled, the reader will read messages from the compacted topic rather than reading the full message backlog
+	// of the topic. This means that, if the topic has been compacted, the reader will only see the latest value for
+	// each key in the topic, up until the point in the topic message backlog that has been compacted. Beyond that
+	// point, the messages will be sent as normal.
+	//
+	// ReadCompacted can only be enabled when reading from a persistent topic. Attempting to enable it on non-persistent
+	// topics will lead to the reader create call throwing a PulsarClientException.
+	ReadCompacted bool
+}
+
+// A Reader can be used to scan through all the messages currently available in a topic.
+type Reader interface {
+	// The topic from which this reader is reading from
+	Topic() string
+
+	// Read the next message in the topic, blocking until a message is available
+	Next(context.Context) (Message, error)
+
+	// Check if there is any message available to read from the current position
+	HasNext() (bool, error)
+
+	// Close the reader and stop the broker to push more messages
+	Close() error
+}