You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/11/11 21:57:55 UTC

[pulsar-client-go] branch master updated: Simplify and refactor parts of the single topic consumer. (#86)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 39d7cdc  Simplify and refactor parts of  the single topic consumer. (#86)
39d7cdc is described below

commit 39d7cdc46ac49299148f293ae9d5fa154d29b8ad
Author: cckellogg <cc...@gmail.com>
AuthorDate: Mon Nov 11 13:57:45 2019 -0800

    Simplify and refactor parts of  the single topic consumer. (#86)
    
    * Simplify and refactor parts of  the single topic consumer.
    
    * Wait when closing a consumer and change get chan method.
---
 pulsar/consumer.go                |  49 +--
 pulsar/consumer_impl.go           | 278 ++++++++++++++
 pulsar/consumer_partition.go      | 510 ++++++++++++++++++++++++++
 pulsar/consumer_test.go           | 442 ++++-------------------
 util/error.go => pulsar/helper.go |  12 +-
 pulsar/impl_client.go             |   9 +-
 pulsar/impl_consumer.go           | 316 ----------------
 pulsar/impl_partition_consumer.go | 739 --------------------------------------
 pulsar/internal/connection.go     |  19 +-
 pulsar/internal/rpc_client.go     |   3 +
 pulsar/test_helper.go             |  20 ++
 pulsar/unacked_msg_tracker.go     |   2 +-
 util/util.go                      |  44 ---
 util/util_test.go                 |  41 ---
 14 files changed, 914 insertions(+), 1570 deletions(-)

diff --git a/pulsar/consumer.go b/pulsar/consumer.go
index c259cd6..3b729ec 100644
--- a/pulsar/consumer.go
+++ b/pulsar/consumer.go
@@ -35,11 +35,13 @@ const (
 	// Exclusive there can be only 1 consumer on the same topic with the same subscription name
 	Exclusive SubscriptionType = iota
 
-	// Shared subscription mode, multiple consumer will be able to use the same subscription name and the messages will be dispatched according to
+	// Shared subscription mode, multiple consumer will be able to use the same subscription name
+	// and the messages will be dispatched according to
 	// a round-robin rotation between the connected consumers
 	Shared
 
-	// Failover subscription mode, multiple consumer will be able to use the same subscription name but only 1 consumer will receive the messages.
+	// Failover subscription mode, multiple consumer will be able to use the same subscription name
+	// but only 1 consumer will receive the messages.
 	// If that consumer disconnects, one of the other connected consumers will start receiving messages.
 	Failover
 
@@ -48,14 +50,14 @@ const (
 	KeyShared
 )
 
-type InitialPosition int
+type SubscriptionInitialPosition int
 
 const (
 	// Latest position which means the start consuming position will be the last message
-	Latest InitialPosition = iota
+	SubscriptionPositionLatest SubscriptionInitialPosition = iota
 
 	// Earliest position which means the start consuming position will be the first message
-	Earliest
+	SubscriptionPositionEarliest
 )
 
 // ConsumerOptions is used to configure and create instances of Consumer
@@ -91,7 +93,7 @@ type ConsumerOptions struct {
 
 	// InitialPosition at which the cursor will be set when subscribe
 	// Default is `Latest`
-	SubscriptionInitPos InitialPosition
+	SubscriptionInitialPosition
 
 	// Sets a `MessageChannel` for the consumer
 	// When a message is received, it will be pushed to the channel for consumption
@@ -139,11 +141,8 @@ type Consumer interface {
 	// This calls blocks until a message is available.
 	Receive(context.Context) (Message, error)
 
-	// ReceiveAsync appends the message to the msgs channel asynchronously.
-	ReceiveAsync(ctx context.Context, msgs chan<- ConsumerMessage) error
-
-	// ReceiveAsyncWithCallback returns a callback containing the message and error objects
-	ReceiveAsyncWithCallback(ctx context.Context, callback func(msg Message, err error))
+	// Chan returns a channel to consume messages from
+	Chan() <-chan ConsumerMessage
 
 	// Ack the consumption of a single message
 	Ack(Message) error
@@ -151,34 +150,6 @@ type Consumer interface {
 	// AckID the consumption of a single message, identified by its MessageID
 	AckID(MessageID) error
 
-	// AckCumulative the reception of all the messages in the stream up to (and including) the provided message.
-	// This method will block until the acknowledge has been sent to the broker. After that, the messages will not be
-	// re-delivered to this consumer.
-	//
-	// Cumulative acknowledge cannot be used when the consumer type is set to ConsumerShared.
-	//
-	// It's equivalent to calling asyncAcknowledgeCumulative(Message) and waiting for the callback to be triggered.
-	AckCumulative(Message) error
-
-	// AckCumulativeID the reception of all the messages in the stream up to (and including) the provided message.
-	// This method will block until the acknowledge has been sent to the broker. After that, the messages will not be
-	// re-delivered to this consumer.
-	// Cumulative acknowledge cannot be used when the consumer type is set to ConsumerShared.
-	// It's equivalent to calling asyncAcknowledgeCumulative(MessageID) and waiting for the callback to be triggered.
-	AckCumulativeID(MessageID) error
-
 	// Close the consumer and stop the broker to push more messages
 	Close() error
-
-	// Seek reset the subscription associated with this consumer to a specific message id.
-	// The message id can either be a specific message or represent the first or last messages in the topic.
-	// Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the
-	//       seek() on the individual partitions.
-	Seek(msgID MessageID) error
-
-	// RedeliverUnackedMessages redeliver all the unacknowledged messages. In Failover mode, the request is ignored if the consumer is not
-	// active for the given topic. In Shared mode, the consumers messages to be redelivered are distributed across all
-	// the connected consumers. This is a non blocking call and doesn't throw an exception. In case the connection
-	// breaks, the messages are redelivered after reconnect.
-	RedeliverUnackedMessages() error
 }
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
new file mode 100644
index 0000000..af9b8d5
--- /dev/null
+++ b/pulsar/consumer_impl.go
@@ -0,0 +1,278 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import (
+	"context"
+	"errors"
+	"fmt"
+	"math/rand"
+	"sync"
+	"time"
+
+	log "github.com/sirupsen/logrus"
+
+	"github.com/apache/pulsar-client-go/pkg/pb"
+	"github.com/apache/pulsar-client-go/pulsar/internal"
+)
+
+var ErrConsumerClosed = errors.New("consumer closed")
+
+type consumer struct {
+	topic string
+
+	options ConsumerOptions
+
+	consumers []*partitionConsumer
+
+	// channel used to deliver message to clients
+	messageCh chan ConsumerMessage
+
+	closeCh chan struct{}
+	errorCh chan error
+
+	log *log.Entry
+}
+
+func newConsumer(client *client, options ConsumerOptions) (Consumer, error) {
+	if options.Topic == "" && options.Topics == nil && options.TopicsPattern == "" {
+		return nil, newError(TopicNotFound, "topic is required")
+	}
+
+	if options.SubscriptionName == "" {
+		return nil, newError(SubscriptionNotFound, "subscription name is required for consumer")
+	}
+
+	if options.ReceiverQueueSize == 0 {
+		options.ReceiverQueueSize = 1000
+	}
+
+	// did the user pass in a message channel?
+	messageCh := options.MessageChannel
+	if options.MessageChannel == nil {
+		messageCh = make(chan ConsumerMessage, 10)
+	}
+
+	// single topic consumer
+	if options.Topic != "" || len(options.Topics) == 1 {
+		topic := options.Topic
+		if topic == "" {
+			topic = options.Topics[0]
+		}
+
+		if _, err := internal.ParseTopicName(topic); err != nil {
+			return nil, err
+		}
+
+		return topicSubscribe(client, options, topic, messageCh)
+	}
+
+	return nil, newError(ResultInvalidTopicName, "topic name is required for consumer")
+}
+
+func topicSubscribe(client *client, options ConsumerOptions, topic string,
+	messageCh chan ConsumerMessage) (Consumer, error) {
+	consumer := &consumer{
+		topic:     topic,
+		messageCh: messageCh,
+		errorCh:   make(chan error),
+		log:       log.WithField("topic", topic),
+	}
+
+	partitions, err := client.TopicPartitions(topic)
+	if err != nil {
+		return nil, err
+	}
+
+	numPartitions := len(partitions)
+	consumer.consumers = make([]*partitionConsumer, numPartitions)
+
+	type ConsumerError struct {
+		err       error
+		partition int
+		consumer  *partitionConsumer
+	}
+
+	consumerName := options.Name
+	if consumerName == "" {
+		consumerName = generateRandomName()
+	}
+
+	receiverQueueSize := options.ReceiverQueueSize
+	var wg sync.WaitGroup
+	ch := make(chan ConsumerError, numPartitions)
+	for partitionIdx, partitionTopic := range partitions {
+		wg.Add(1)
+		go func(idx int, pt string) {
+			defer wg.Done()
+			opts := &partitionConsumerOpts{
+				topic:               pt,
+				consumerName:        consumerName,
+				subscription:        options.SubscriptionName,
+				subscriptionType:    options.Type,
+				subscriptionInitPos: options.SubscriptionInitialPosition,
+				partitionIdx:        idx,
+				receiverQueueSize:   receiverQueueSize,
+			}
+			cons, err := newPartitionConsumer(consumer, client, opts, messageCh)
+			ch <- ConsumerError{
+				err:       err,
+				partition: idx,
+				consumer:  cons,
+			}
+		}(partitionIdx, partitionTopic)
+	}
+
+	go func() {
+		wg.Wait()
+		close(ch)
+	}()
+
+	for ce := range ch {
+		if ce.err != nil {
+			err = ce.err
+		} else {
+			consumer.consumers[ce.partition] = ce.consumer
+		}
+	}
+
+	if err != nil {
+		// Since there were some failures,
+		// cleanup all the partitions that succeeded in creating the consumer
+		for _, c := range consumer.consumers {
+			if c != nil {
+				_ = c.Close()
+			}
+		}
+		return nil, err
+	}
+
+	return consumer, nil
+}
+
+func (c *consumer) Topic() string {
+	return c.topic
+}
+
+func (c *consumer) Subscription() string {
+	return c.options.SubscriptionName
+}
+
+func (c *consumer) Unsubscribe() error {
+	var errMsg string
+	for _, consumer := range c.consumers {
+		if err := consumer.Unsubscribe(); err != nil {
+			errMsg += fmt.Sprintf("topic %s, subscription %s: %s", c.Topic(), c.Subscription(), err)
+		}
+	}
+	if errMsg != "" {
+		return fmt.Errorf(errMsg)
+	}
+	return nil
+}
+
+func (c *consumer) Receive(ctx context.Context) (message Message, err error) {
+	for {
+		select {
+		case cm, ok := <-c.messageCh:
+			if !ok {
+				return nil, ErrConsumerClosed
+			}
+			return cm.Message, nil
+		case <-ctx.Done():
+			return nil, ctx.Err()
+		}
+	}
+}
+
+// Messages
+func (c *consumer) Chan() <-chan ConsumerMessage {
+	return c.messageCh
+}
+
+// Ack the consumption of a single message
+func (c *consumer) Ack(msg Message) error {
+	return c.AckID(msg.ID())
+}
+
+// Ack the consumption of a single message, identified by its MessageID
+func (c *consumer) AckID(msgID MessageID) error {
+	mid, ok := msgID.(*messageID)
+	if !ok {
+		return fmt.Errorf("invalid message id type")
+	}
+
+	partition := mid.partitionIdx
+	// did we receive a valid partition index?
+	if partition < 0 || partition >= len(c.consumers) {
+		return fmt.Errorf("invalid partition index %d expected a partition between [0-%d]",
+			partition, len(c.consumers))
+	}
+	return c.consumers[partition].AckID(msgID)
+}
+
+func (c *consumer) Close() error {
+	var wg sync.WaitGroup
+	for i := range c.consumers {
+		wg.Add(1)
+		go func(pc *partitionConsumer) {
+			defer wg.Done()
+			pc.Close()
+		}(c.consumers[i])
+	}
+	wg.Wait()
+
+	return nil
+}
+
+var random = rand.New(rand.NewSource(time.Now().UnixNano()))
+
+func generateRandomName() string {
+	chars := "abcdefghijklmnopqrstuvwxyz"
+	bytes := make([]byte, 5)
+	for i := range bytes {
+		bytes[i] = chars[random.Intn(len(chars))]
+	}
+	return string(bytes)
+}
+
+func toProtoSubType(st SubscriptionType) pb.CommandSubscribe_SubType {
+	switch st {
+	case Exclusive:
+		return pb.CommandSubscribe_Exclusive
+	case Shared:
+		return pb.CommandSubscribe_Shared
+	case Failover:
+		return pb.CommandSubscribe_Failover
+	case KeyShared:
+		return pb.CommandSubscribe_Key_Shared
+	}
+
+	return pb.CommandSubscribe_Exclusive
+}
+
+func toProtoInitialPosition(p SubscriptionInitialPosition) pb.CommandSubscribe_InitialPosition {
+	switch p {
+	case SubscriptionPositionLatest:
+		return pb.CommandSubscribe_Latest
+	case SubscriptionPositionEarliest:
+		return pb.CommandSubscribe_Earliest
+	}
+
+	return pb.CommandSubscribe_Latest
+}
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
new file mode 100644
index 0000000..ae4889b
--- /dev/null
+++ b/pulsar/consumer_partition.go
@@ -0,0 +1,510 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import (
+	"fmt"
+	"math"
+	"time"
+
+	"github.com/golang/protobuf/proto"
+
+	log "github.com/sirupsen/logrus"
+
+	"github.com/apache/pulsar-client-go/pkg/pb"
+	"github.com/apache/pulsar-client-go/pulsar/internal"
+)
+
+type consumerState int
+
+const (
+	consumerInit consumerState = iota
+	consumerReady
+	consumerClosing
+	consumerClosed
+)
+
+type partitionConsumerOpts struct {
+	topic               string
+	consumerName        string
+	subscription        string
+	subscriptionType    SubscriptionType
+	subscriptionInitPos SubscriptionInitialPosition
+	partitionIdx        int
+	receiverQueueSize   int
+}
+
+type partitionConsumer struct {
+	client *client
+
+	// this is needed for sending ConsumerMessage on the messageCh
+	parentConsumer Consumer
+	state          consumerState
+	options        *partitionConsumerOpts
+
+	conn internal.Connection
+
+	topic        string
+	name         string
+	consumerID   uint64
+	partitionIdx int
+
+	// shared channel
+	messageCh chan ConsumerMessage
+
+	// the number of message slots available
+	availablePermits int32
+
+	// the size of the queue channel for buffering messages
+	queueSize int32
+	queueCh   chan []*message
+
+	eventsCh    chan interface{}
+	connectedCh chan struct{}
+	closeCh     chan struct{}
+
+	log *log.Entry
+}
+
+func newPartitionConsumer(parent Consumer, client *client, options *partitionConsumerOpts,
+	messageCh chan ConsumerMessage) (*partitionConsumer, error) {
+	pc := &partitionConsumer{
+		state:          consumerInit,
+		parentConsumer: parent,
+		client:         client,
+		options:        options,
+		topic:          options.topic,
+		name:           options.consumerName,
+		consumerID:     client.rpcClient.NewConsumerID(),
+		partitionIdx:   options.partitionIdx,
+		eventsCh:       make(chan interface{}, 3),
+		queueSize:      int32(options.receiverQueueSize),
+		queueCh:        make(chan []*message, options.receiverQueueSize),
+		connectedCh:    make(chan struct{}),
+		messageCh:      messageCh,
+		closeCh:        make(chan struct{}),
+		log:            log.WithField("topic", options.topic),
+	}
+	pc.log = pc.log.WithField("name", pc.name).WithField("subscription", options.subscription)
+
+	err := pc.grabConn()
+	if err != nil {
+		log.WithError(err).Errorf("Failed to create consumer")
+		return nil, err
+	}
+	pc.log.Info("Created consumer")
+	pc.state = consumerReady
+
+	go pc.dispatcher()
+
+	go pc.runEventsLoop()
+
+	return pc, nil
+}
+
+func (pc *partitionConsumer) Unsubscribe() error {
+	req := &unsubscribeRequest{doneCh: make(chan struct{})}
+	pc.eventsCh <- req
+
+	// wait for the request to complete
+	<-req.doneCh
+	return req.err
+}
+
+func (pc *partitionConsumer) internalUnsubscribe(unsub *unsubscribeRequest) {
+	defer close(unsub.doneCh)
+
+	requestID := pc.client.rpcClient.NewRequestID()
+	cmdUnsubscribe := &pb.CommandUnsubscribe{
+		RequestId:  proto.Uint64(requestID),
+		ConsumerId: proto.Uint64(pc.consumerID),
+	}
+	_, err := pc.client.rpcClient.RequestOnCnx(pc.conn, requestID, pb.BaseCommand_UNSUBSCRIBE, cmdUnsubscribe)
+	if err != nil {
+		pc.log.WithError(err).Error("Failed to unsubscribe consumer")
+		unsub.err = err
+	}
+
+	pc.conn.DeleteConsumeHandler(pc.consumerID)
+}
+
+func (pc *partitionConsumer) Ack(msg Message) error {
+	return pc.AckID(msg.ID())
+}
+
+func (pc *partitionConsumer) AckID(msgID MessageID) error {
+	req := &ackRequest{
+		doneCh: make(chan struct{}),
+		msgID:  msgID,
+	}
+	pc.eventsCh <- req
+
+	<-req.doneCh
+	return req.err
+}
+
+func (pc *partitionConsumer) Close() error {
+	if pc.state != consumerReady {
+		return nil
+	}
+
+	req := &closeRequest{doneCh: make(chan struct{})}
+	pc.eventsCh <- req
+
+	// wait for request to finish
+	<-req.doneCh
+	return req.err
+}
+
+func (pc *partitionConsumer) internalAck(req *ackRequest) {
+	defer close(req.doneCh)
+
+	id := &pb.MessageIdData{}
+	messageIDs := make([]*pb.MessageIdData, 0)
+	err := proto.Unmarshal(req.msgID.Serialize(), id)
+	if err != nil {
+		pc.log.WithError(err).Error("unable to serialize message id")
+		req.err = err
+	}
+
+	messageIDs = append(messageIDs, id)
+	requestID := internal.RequestIDNoResponse
+	cmdAck := &pb.CommandAck{
+		ConsumerId: proto.Uint64(pc.consumerID),
+		MessageId:  messageIDs,
+		AckType:    pb.CommandAck_Individual.Enum(),
+	}
+	_, err = pc.client.rpcClient.RequestOnCnxNoWait(pc.conn, requestID, pb.BaseCommand_ACK, cmdAck)
+	if err != nil {
+		pc.log.WithError(err).Errorf("failed to ack message_id=%s", id)
+		req.err = err
+	}
+}
+
+func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, headersAndPayload internal.Buffer) error {
+	pbMsgID := response.GetMessageId()
+
+	reader := internal.NewMessageReader(headersAndPayload.ReadableSlice())
+	msgMeta, err := reader.ReadMessageMetadata()
+	if err != nil {
+		// TODO send discardCorruptedMessage
+		return err
+	}
+
+	numMsgs := 1
+	if msgMeta.NumMessagesInBatch != nil {
+		numMsgs = int(msgMeta.GetNumMessagesInBatch())
+	}
+	messages := make([]*message, numMsgs)
+	for i := 0; i < numMsgs; i++ {
+		smm, payload, err := reader.ReadMessage()
+		if err != nil {
+			// TODO send corrupted message
+			return err
+		}
+
+		msgID := newMessageID(int64(pbMsgID.GetLedgerId()), int64(pbMsgID.GetEntryId()), i, pc.partitionIdx)
+		var msg *message
+		if smm != nil {
+			msg = &message{
+				publishTime: timeFromUnixTimestampMillis(msgMeta.GetPublishTime()),
+				eventTime:   timeFromUnixTimestampMillis(smm.GetEventTime()),
+				key:         smm.GetPartitionKey(),
+				properties:  internal.ConvertToStringMap(smm.GetProperties()),
+				topic:       pc.topic,
+				msgID:       msgID,
+				payLoad:     payload,
+			}
+		} else {
+			msg = &message{
+				publishTime: timeFromUnixTimestampMillis(msgMeta.GetPublishTime()),
+				eventTime:   timeFromUnixTimestampMillis(msgMeta.GetEventTime()),
+				key:         msgMeta.GetPartitionKey(),
+				properties:  internal.ConvertToStringMap(msgMeta.GetProperties()),
+				topic:       pc.topic,
+				msgID:       msgID,
+				payLoad:     payload,
+			}
+		}
+
+		messages[i] = msg
+	}
+
+	// send messages to the dispatcher
+	pc.queueCh <- messages
+	return nil
+}
+
+func (pc *partitionConsumer) ConnectionClosed() {
+	// Trigger reconnection in the consumer goroutine
+	pc.eventsCh <- &connectionClosed{}
+}
+
+// Flow command gives additional permits to send messages to the consumer.
+// A typical consumer implementation will use a queue to accumulate these messages
+// before the application is ready to consume them. After the consumer is ready,
+// the client needs to give permission to the broker to push messages.
+func (pc *partitionConsumer) internalFlow(permits uint32) error {
+	if permits == 0 {
+		return fmt.Errorf("invalid number of permits requested: %d", permits)
+	}
+
+	requestID := internal.RequestIDNoResponse
+	cmdFlow := &pb.CommandFlow{
+		ConsumerId:     proto.Uint64(pc.consumerID),
+		MessagePermits: proto.Uint32(permits),
+	}
+	_, err := pc.client.rpcClient.RequestOnCnxNoWait(pc.conn, requestID, pb.BaseCommand_FLOW, cmdFlow)
+	if err != nil {
+		return err
+	}
+
+	return nil
+}
+
+// dispatcher manages the internal message queue channel
+// and manages the flow control
+func (pc *partitionConsumer) dispatcher() {
+	defer func() {
+		pc.log.Info("exiting dispatch loop")
+	}()
+	var messages []*message
+	for {
+		var queueCh chan []*message
+		var messageCh chan ConsumerMessage
+		var nextMessage ConsumerMessage
+
+		// are there more messages to send?
+		if len(messages) > 0 {
+			nextMessage = ConsumerMessage{
+				Consumer: pc.parentConsumer,
+				Message:  messages[0],
+			}
+			messageCh = pc.messageCh
+		} else {
+			// we are ready for more messages
+			queueCh = pc.queueCh
+		}
+
+		select {
+		case <-pc.closeCh:
+			return
+
+		case _, ok := <-pc.connectedCh:
+			if !ok {
+				return
+			}
+			pc.log.Debug("dispatcher received connection event")
+
+			// drain messages
+			messages = nil
+
+			// drain the message queue on any new connection by sending a
+			// special nil message to the channel so we know when to stop dropping messages
+			go func() {
+				pc.queueCh <- nil
+			}()
+			for m := range pc.queueCh {
+				// the queue has been drained
+				if m == nil {
+					break
+				}
+			}
+
+			// reset available permits
+			pc.availablePermits = 0
+			initialPermits := uint32(pc.queueSize)
+
+			pc.log.Debugf("dispatcher requesting initial permits=%d", initialPermits)
+			// send initial permits
+			if err := pc.internalFlow(initialPermits); err != nil {
+				pc.log.WithError(err).Error("unable to send initial permits to broker")
+			}
+
+		case msgs, ok := <-queueCh:
+			if !ok {
+				return
+			}
+			// we only read messages here after the consumer has processed all messages
+			// in the previous batch
+			messages = msgs
+
+		// if the messageCh is nil or the messageCh is full this will not be selected
+		case messageCh <- nextMessage:
+			// allow this message to be garbage collected
+			messages[0] = nil
+			messages = messages[1:]
+
+			// TODO implement a better flow controller
+			// send more permits if needed
+			pc.availablePermits++
+			flowThreshold := int32(math.Max(float64(pc.queueSize/2), 1))
+			if pc.availablePermits >= flowThreshold {
+				availablePermits := pc.availablePermits
+				requestedPermits := availablePermits
+				pc.availablePermits = 0
+
+				pc.log.Debugf("requesting more permits=%d available=%d", requestedPermits, availablePermits)
+				if err := pc.internalFlow(uint32(requestedPermits)); err != nil {
+					pc.log.WithError(err).Error("unable to send permits")
+				}
+			}
+		}
+	}
+}
+
+type ackRequest struct {
+	doneCh chan struct{}
+	msgID  MessageID
+	err    error
+}
+
+type unsubscribeRequest struct {
+	doneCh chan struct{}
+	err    error
+}
+
+type closeRequest struct {
+	doneCh chan struct{}
+	err    error
+}
+
+func (pc *partitionConsumer) runEventsLoop() {
+	defer func() {
+		pc.log.Info("exiting events loop")
+	}()
+	for {
+		select {
+		case <-pc.closeCh:
+			return
+		case i := <-pc.eventsCh:
+			switch v := i.(type) {
+			case *ackRequest:
+				pc.internalAck(v)
+			case *unsubscribeRequest:
+				pc.internalUnsubscribe(v)
+			case *connectionClosed:
+				pc.reconnectToBroker()
+			case *closeRequest:
+				pc.internalClose(v)
+				return
+			}
+		}
+	}
+}
+
+func (pc *partitionConsumer) internalClose(req *closeRequest) {
+	defer close(req.doneCh)
+	if pc.state != consumerReady {
+		return
+	}
+
+	pc.state = consumerClosing
+	pc.log.Infof("Closing consumer=%d", pc.consumerID)
+
+	requestID := pc.client.rpcClient.NewConsumerID()
+	cmdClose := &pb.CommandCloseConsumer{
+		ConsumerId: proto.Uint64(pc.consumerID),
+		RequestId:  proto.Uint64(requestID),
+	}
+	_, err := pc.client.rpcClient.RequestOnCnx(pc.conn, requestID, pb.BaseCommand_CLOSE_CONSUMER, cmdClose)
+	if err != nil {
+		req.err = err
+	} else {
+		pc.log.Info("Closed consumer")
+		pc.state = consumerClosed
+		pc.conn.DeleteConsumeHandler(pc.consumerID)
+		close(pc.closeCh)
+	}
+}
+
+func (pc *partitionConsumer) reconnectToBroker() {
+	backoff := internal.Backoff{}
+	for {
+		if pc.state != consumerReady {
+			// Consumer is already closing
+			return
+		}
+
+		d := backoff.Next()
+		pc.log.Info("Reconnecting to broker in ", d)
+		time.Sleep(d)
+
+		err := pc.grabConn()
+		if err == nil {
+			// Successfully reconnected
+			pc.log.Info("Reconnected consumer to broker")
+			return
+		}
+	}
+}
+
+func (pc *partitionConsumer) grabConn() error {
+	lr, err := pc.client.lookupService.Lookup(pc.topic)
+	if err != nil {
+		pc.log.WithError(err).Warn("Failed to lookup topic")
+		return err
+	}
+	pc.log.Debugf("Lookup result: %+v", lr)
+
+	subType := toProtoSubType(pc.options.subscriptionType)
+	initialPosition := toProtoInitialPosition(pc.options.subscriptionInitPos)
+	requestID := pc.client.rpcClient.NewRequestID()
+	cmdSubscribe := &pb.CommandSubscribe{
+		RequestId:       proto.Uint64(requestID),
+		Topic:           proto.String(pc.topic),
+		SubType:         subType.Enum(),
+		Subscription:    proto.String(pc.options.subscription),
+		ConsumerId:      proto.Uint64(pc.consumerID),
+		ConsumerName:    proto.String(pc.name),
+		InitialPosition: initialPosition.Enum(),
+		Schema:          nil,
+	}
+
+	res, err := pc.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, requestID,
+		pb.BaseCommand_SUBSCRIBE, cmdSubscribe)
+
+	if err != nil {
+		pc.log.WithError(err).Error("Failed to create consumer")
+		return err
+	}
+
+	if res.Response.ConsumerStatsResponse != nil {
+		pc.name = res.Response.ConsumerStatsResponse.GetConsumerName()
+	}
+
+	pc.conn = res.Cnx
+	pc.log.Info("Connected consumer")
+	pc.conn.AddConsumeHandler(pc.consumerID, pc)
+
+	msgType := res.Response.GetType()
+
+	switch msgType {
+	case pb.BaseCommand_SUCCESS:
+		// notify the dispatcher we have connection
+		go func() {
+			pc.connectedCh <- struct{}{}
+		}()
+		return nil
+	case pb.BaseCommand_ERROR:
+		errMsg := res.Response.GetError()
+		return fmt.Errorf("%s: %s", errMsg.GetError().String(), errMsg.GetMessage())
+	default:
+		return newUnexpectedErrMsg(msgType, requestID)
+	}
+}
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index e6f4d52..bbc1315 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -22,11 +22,9 @@ import (
 	"fmt"
 	"log"
 	"net/http"
-	"strings"
 	"testing"
 	"time"
 
-	"github.com/apache/pulsar-client-go/util"
 	"github.com/stretchr/testify/assert"
 )
 
@@ -96,11 +94,6 @@ func TestProducerConsumer(t *testing.T) {
 			log.Fatal(err)
 		}
 	}
-
-	// unsubscribe consumer
-	if err := consumer.Unsubscribe(); err != nil {
-		log.Fatal(err)
-	}
 }
 
 func TestConsumerConnectError(t *testing.T) {
@@ -176,13 +169,6 @@ func TestBatchMessageReceive(t *testing.T) {
 		count++
 	}
 
-	// check strategically
-	for i := 0; i < 3; i++ {
-		if count == numOfMessages {
-			break
-		}
-		time.Sleep(time.Second)
-	}
 	assert.Equal(t, count, numOfMessages)
 }
 
@@ -202,7 +188,7 @@ func TestConsumerWithInvalidConf(t *testing.T) {
 		Topic: "my-topic",
 	})
 
-	// Expect error in creating cosnumer
+	// Expect error in creating consumer
 	assert.Nil(t, consumer)
 	assert.NotNil(t, err)
 
@@ -220,7 +206,7 @@ func TestConsumerWithInvalidConf(t *testing.T) {
 	assert.Equal(t, err.(*Error).Result(), TopicNotFound)
 }
 
-func TestConsumer_SubscriptionEarliestPos(t *testing.T) {
+func TestConsumerSubscriptionEarliestPosition(t *testing.T) {
 	client, err := NewClient(ClientOptions{
 		URL: lookupURL,
 	})
@@ -238,9 +224,8 @@ func TestConsumer_SubscriptionEarliestPos(t *testing.T) {
 	assert.Nil(t, err)
 	defer producer.Close()
 
-	//sent message
+	// send message
 	ctx := context.Background()
-
 	err = producer.Send(ctx, &ProducerMessage{
 		Payload: []byte("msg-1-content-1"),
 	})
@@ -253,9 +238,9 @@ func TestConsumer_SubscriptionEarliestPos(t *testing.T) {
 
 	// create consumer
 	consumer, err := client.Subscribe(ConsumerOptions{
-		Topic:               topicName,
-		SubscriptionName:    subName,
-		SubscriptionInitPos: Earliest,
+		Topic:                       topicName,
+		SubscriptionName:            subName,
+		SubscriptionInitialPosition: SubscriptionPositionEarliest,
 	})
 	assert.Nil(t, err)
 	defer consumer.Close()
@@ -266,24 +251,6 @@ func TestConsumer_SubscriptionEarliestPos(t *testing.T) {
 	assert.Equal(t, "msg-1-content-1", string(msg.Payload()))
 }
 
-func makeHTTPCall(t *testing.T, method string, urls string, body string) {
-	client := http.Client{}
-
-	req, err := http.NewRequest(method, urls, strings.NewReader(body))
-	if err != nil {
-		t.Fatal(err)
-	}
-
-	req.Header.Set("Content-Type", "application/json")
-	req.Header.Set("Accept", "application/json")
-
-	res, err := client.Do(req)
-	if err != nil {
-		t.Fatal(err)
-	}
-	defer res.Body.Close()
-}
-
 func TestConsumerKeyShared(t *testing.T) {
 	client, err := NewClient(ClientOptions{
 		URL: lookupURL,
@@ -311,7 +278,8 @@ func TestConsumerKeyShared(t *testing.T) {
 
 	// create producer
 	producer, err := client.CreateProducer(ProducerOptions{
-		Topic: topic,
+		Topic:           topic,
+		DisableBatching: true,
 	})
 	assert.Nil(t, err)
 	defer producer.Close()
@@ -325,33 +293,32 @@ func TestConsumerKeyShared(t *testing.T) {
 		assert.Nil(t, err)
 	}
 
-	time.Sleep(time.Second * 1)
-
-	go func() {
-		for i := 0; i < 10; i++ {
-			msg, err := consumer1.Receive(ctx)
-			assert.Nil(t, err)
-			if msg != nil {
-				fmt.Printf("consumer1 key is: %s, value is: %s\n", msg.Key(), string(msg.Payload()))
-				err = consumer1.Ack(msg)
-				assert.Nil(t, err)
+	receivedConsumer1 := 0
+	receivedConsumer2 := 0
+	for (receivedConsumer1 + receivedConsumer2) < 10 {
+		select {
+		case cm, ok := <-consumer1.Chan():
+			if !ok {
+				break
 			}
-		}
-	}()
-
-	go func() {
-		for i := 0; i < 10; i++ {
-			msg2, err := consumer2.Receive(ctx)
-			assert.Nil(t, err)
-			if msg2 != nil {
-				fmt.Printf("consumer2 key is:%s, value is: %s\n", msg2.Key(), string(msg2.Payload()))
-				err = consumer2.Ack(msg2)
-				assert.Nil(t, err)
+			receivedConsumer1++
+			if err := consumer1.Ack(cm.Message); err != nil {
+				log.Fatal(err)
+			}
+		case cm, ok := <-consumer2.Chan():
+			if !ok {
+				break
+			}
+			receivedConsumer2++
+			if err := consumer2.Ack(cm.Message); err != nil {
+				log.Fatal(err)
 			}
 		}
-	}()
+	}
 
-	time.Sleep(time.Second * 1)
+	fmt.Printf("TestConsumerKeyShared received messages consumer1: %d consumser2: %d\n",
+		receivedConsumer1, receivedConsumer2)
+	assert.Equal(t, 10, receivedConsumer1+receivedConsumer2)
 }
 
 func TestPartitionTopicsConsumerPubSub(t *testing.T) {
@@ -414,77 +381,16 @@ func TestPartitionTopicsConsumerPubSub(t *testing.T) {
 	assert.Equal(t, len(msgs), 10)
 }
 
-func TestConsumer_ReceiveAsync(t *testing.T) {
+func TestConsumerReceiveTimeout(t *testing.T) {
 	client, err := NewClient(ClientOptions{
 		URL: lookupURL,
 	})
-
 	assert.Nil(t, err)
 	defer client.Close()
 
-	topicName := "persistent://public/default/receive-async"
-	subName := "subscription-receive-async"
-	ctx := context.Background()
-	ch := make(chan ConsumerMessage, 10)
-
-	// create producer
-	producer, err := client.CreateProducer(ProducerOptions{
-		Topic: topicName,
-	})
-	assert.Nil(t, err)
-	defer producer.Close()
-
-	consumer, err := client.Subscribe(ConsumerOptions{
-		Topic:            topicName,
-		SubscriptionName: subName,
-	})
-	assert.Nil(t, err)
-	defer consumer.Close()
-
-	//send 10 messages
-	for i := 0; i < 10; i++ {
-		err = producer.Send(ctx, &ProducerMessage{
-			Payload: []byte(fmt.Sprintf("hello-%d", i)),
-		})
-		assert.Nil(t, err)
-	}
-
-	//receive async 10 messages
-	err = consumer.ReceiveAsync(ctx, ch)
-	assert.Nil(t, err)
-
-	payloadList := make([]string, 0, 10)
-
-RECEIVE:
-	for {
-		select {
-		case cMsg, ok := <-ch:
-			if ok {
-				fmt.Printf("receive message payload is:%s\n", string(cMsg.Payload()))
-				assert.Equal(t, topicName, cMsg.Message.Topic())
-				assert.Equal(t, topicName, cMsg.Consumer.Topic())
-				payloadList = append(payloadList, string(cMsg.Message.Payload()))
-				if len(payloadList) == 10 {
-					break RECEIVE
-				}
-			}
-			continue RECEIVE
-		case <-ctx.Done():
-			t.Error("context error.")
-			return
-		}
-	}
-}
-
-func TestConsumerAckTimeout(t *testing.T) {
-	client, err := NewClient(ClientOptions{
-		URL: lookupURL,
-	})
-	assert.Nil(t, err)
-	defer client.Close()
-
-	topic := "test-ack-timeout-topic-1"
-	ctx := context.Background()
+	topic := "test-topic-with-no-messages"
+	ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
+	defer cancel()
 
 	// create consumer
 	consumer, err := client.Subscribe(ConsumerOptions{
@@ -496,155 +402,12 @@ func TestConsumerAckTimeout(t *testing.T) {
 	assert.Nil(t, err)
 	defer consumer.Close()
 
-	// create consumer1
-	consumer1, err := client.Subscribe(ConsumerOptions{
-		Topic:            topic,
-		SubscriptionName: "my-sub2",
-		Type:             Shared,
-		AckTimeout:       5 * 1000,
-	})
-	assert.Nil(t, err)
-	defer consumer1.Close()
-
-	// create producer
-	producer, err := client.CreateProducer(ProducerOptions{
-		Topic:           topic,
-		DisableBatching: true,
-	})
-	assert.Nil(t, err)
-	defer producer.Close()
-
-	// send 10 messages
-	for i := 0; i < 10; i++ {
-		if err := producer.Send(ctx, &ProducerMessage{
-			Payload: []byte(fmt.Sprintf("hello-%d", i)),
-		}); err != nil {
-			log.Fatal(err)
-		}
-	}
-
-	// consumer receive 10 messages
-	payloadList := make([]string, 0, 10)
-	for i := 0; i < 10; i++ {
-		msg, err := consumer.Receive(context.Background())
-		if err != nil {
-			log.Fatal(err)
-		}
-		payloadList = append(payloadList, string(msg.Payload()))
-
-		// not ack message
-	}
-	assert.Equal(t, 10, len(payloadList))
-
-	// consumer1 receive 10 messages
-	for i := 0; i < 10; i++ {
-		msg, err := consumer1.Receive(context.Background())
-		if err != nil {
-			log.Fatal(err)
-		}
-
-		payloadList = append(payloadList, string(msg.Payload()))
-
-		// ack half of the messages
-		if i%2 == 0 {
-			err = consumer1.Ack(msg)
-			assert.Nil(t, err)
-		}
-	}
-
-	// wait ack timeout
-	time.Sleep(6 * time.Second)
-
-	fmt.Println("start redeliver messages...")
-
-	payloadList = make([]string, 0, 10)
-	// consumer receive messages again
-	for i := 0; i < 10; i++ {
-		msg, err := consumer.Receive(context.Background())
-		if err != nil {
-			log.Fatal(err)
-		}
-		payloadList = append(payloadList, string(msg.Payload()))
-
-		// ack message
-		if err := consumer.Ack(msg); err != nil {
-			log.Fatal(err)
-		}
-	}
-	assert.Equal(t, 10, len(payloadList))
-
-	payloadList = make([]string, 0, 5)
-	// consumer1 receive messages again
-	go func() {
-		for i := 0; i < 10; i++ {
-			msg, err := consumer1.Receive(context.Background())
-			if err != nil {
-				log.Fatal(err)
-			}
-
-			expectMsg := fmt.Sprintf("hello-%d", i)
-			fmt.Printf("redeliver messages, payload is:%s\n", expectMsg)
-			payloadList = append(payloadList, string(msg.Payload()))
-
-			// ack message
-			if err := consumer1.Ack(msg); err != nil {
-				log.Fatal(err)
-			}
-		}
-		assert.Equal(t, 5, len(payloadList))
-	}()
-
-	// sleep 2 seconds, wait gorutine receive messages.
-	time.Sleep(time.Second * 2)
-}
-
-func TestConsumer_ReceiveAsyncWithCallback(t *testing.T) {
-	client, err := NewClient(ClientOptions{
-		URL: lookupURL,
-	})
-
-	assert.Nil(t, err)
-	defer client.Close()
-
-	topicName := "persistent://public/default/receive-async-with-callback"
-	subName := "subscription-receive-async"
-	ctx := context.Background()
-
-	// create producer
-	producer, err := client.CreateProducer(ProducerOptions{
-		Topic: topicName,
-	})
-	assert.Nil(t, err)
-	defer producer.Close()
-
-	consumer, err := client.Subscribe(ConsumerOptions{
-		Topic:            topicName,
-		SubscriptionName: subName,
-	})
-	assert.Nil(t, err)
-	defer consumer.Close()
-
-	//send 10 messages
-	for i := 0; i < 10; i++ {
-		err := producer.Send(ctx, &ProducerMessage{
-			Payload: []byte(fmt.Sprintf("hello-%d", i)),
-		})
-		assert.Nil(t, err)
-	}
-
-	for i := 0; i < 10; i++ {
-		tmpMsg := fmt.Sprintf("hello-%d", i)
-		consumer.ReceiveAsyncWithCallback(ctx, func(msg Message, err error) {
-			if err != nil {
-				log.Fatal(err)
-			}
-			fmt.Printf("receive message payload is:%s\n", string(msg.Payload()))
-			assert.Equal(t, tmpMsg, string(msg.Payload()))
-		})
-	}
+	msg, err := consumer.Receive(ctx)
+	assert.Nil(t, msg)
+	assert.NotNil(t, err)
 }
 
-func TestConsumer_Shared(t *testing.T) {
+func TestConsumerShared(t *testing.T) {
 	client, err := NewClient(ClientOptions{
 		URL: lookupURL,
 	})
@@ -682,116 +445,49 @@ func TestConsumer_Shared(t *testing.T) {
 	assert.Nil(t, err)
 	defer producer.Close()
 
-	// send 10 messages
+	// send 10 messages with unique payloads
 	for i := 0; i < 10; i++ {
 		if err := producer.Send(context.Background(), &ProducerMessage{
 			Payload: []byte(fmt.Sprintf("hello-%d", i)),
 		}); err != nil {
 			log.Fatal(err)
 		}
+		fmt.Println("sending message:", fmt.Sprintf("hello-%d", i))
 	}
 
-	msgList := make([]string, 0, 5)
-	for i := 0; i < 5; i++ {
-		msg, err := consumer1.Receive(context.Background())
-		if err != nil {
-			log.Fatal(err)
-		}
-		fmt.Printf("consumer1 msg id is: %v, value is: %s\n", msg.ID(), string(msg.Payload()))
-		msgList = append(msgList, string(msg.Payload()))
-		if err := consumer1.Ack(msg); err != nil {
-			log.Fatal(err)
-		}
-	}
-
-	assert.Equal(t, 5, len(msgList))
-
-	for i := 0; i < 5; i++ {
-		msg, err := consumer2.Receive(context.Background())
-		if err != nil {
-			log.Fatal(err)
-		}
-		if err := consumer2.Ack(msg); err != nil {
-			log.Fatal(err)
-		}
-		fmt.Printf("consumer2 msg id is: %v, value is: %s\n", msg.ID(), string(msg.Payload()))
-		msgList = append(msgList, string(msg.Payload()))
-	}
-
-	assert.Equal(t, 10, len(msgList))
-	res := util.RemoveDuplicateElement(msgList)
-	assert.Equal(t, 10, len(res))
-}
-
-func TestConsumer_Seek(t *testing.T) {
-	client, err := NewClient(ClientOptions{
-		URL: lookupURL,
-	})
-
-	assert.Nil(t, err)
-	defer client.Close()
-
-	topicName := "persistent://public/default/testSeek"
-	testURL := adminURL + "/" + "admin/v2/persistent/public/default/testSeek"
-	makeHTTPCall(t, http.MethodPut, testURL, "1")
-	subName := "sub-testSeek"
-
-	producer, err := client.CreateProducer(ProducerOptions{
-		Topic: topicName,
-	})
-	assert.Nil(t, err)
-	assert.Equal(t, producer.Topic(), topicName)
-	defer producer.Close()
-
-	consumer, err := client.Subscribe(ConsumerOptions{
-		Topic:            topicName,
-		SubscriptionName: subName,
-	})
-	assert.Nil(t, err)
-	assert.Equal(t, consumer.Topic(), topicName)
-	assert.Equal(t, consumer.Subscription(), subName)
-	defer consumer.Close()
-
-	ctx := context.Background()
-
-	// Send 10 messages synchronously
-	t.Log("Publishing 10 messages synchronously")
-	for msgNum := 0; msgNum < 10; msgNum++ {
-		if err := producer.Send(ctx, &ProducerMessage{
-			Payload: []byte(fmt.Sprintf("msg-content-%d", msgNum)),
-		}); err != nil {
-			t.Fatal(err)
-		}
-	}
-
-	t.Log("Trying to receive 10 messages")
-	idList := make([]MessageID, 0, 10)
-	for msgNum := 0; msgNum < 10; msgNum++ {
-		msg, err := consumer.Receive(ctx)
-		assert.Nil(t, err)
-		idList = append(idList, msg.ID())
-		fmt.Println(string(msg.Payload()))
-	}
-
-	for index, id := range idList {
-		if index == 4 {
-			// seek to fourth message, expected receive fourth message.
-			err = consumer.Seek(id)
-			assert.Nil(t, err)
-			break
+	readMsgs := 0
+	messages := make(map[string]struct{})
+	for readMsgs < 10 {
+		select {
+		case cm, ok := <-consumer1.Chan():
+			if !ok {
+				break
+			}
+			readMsgs++
+			payload := string(cm.Message.Payload())
+			messages[payload] = struct{}{}
+			fmt.Printf("consumer1 msg id is: %v, value is: %s\n", cm.Message.ID(), payload)
+			if err := consumer1.Ack(cm.Message); err != nil {
+				log.Fatal(err)
+			}
+		case cm, ok := <-consumer2.Chan():
+			if !ok {
+				break
+			}
+			readMsgs++
+			payload := string(cm.Message.Payload())
+			messages[payload] = struct{}{}
+			fmt.Printf("consumer2 msg id is: %v, value is: %s\n", cm.Message.ID(), payload)
+			if err := consumer2.Ack(cm.Message); err != nil {
+				log.Fatal(err)
+			}
 		}
 	}
 
-	// Sleeping for 500ms to wait for consumer re-connect
-	time.Sleep(500 * time.Millisecond)
-
-	msg, err := consumer.Receive(ctx)
-	assert.Nil(t, err)
-	t.Logf("again received message:%+v", msg.ID())
-	assert.Equal(t, "msg-content-4", string(msg.Payload()))
+	assert.Equal(t, 10, len(messages))
 }
 
-func TestConsumer_EventTime(t *testing.T) {
+func TestConsumerEventTime(t *testing.T) {
 	client, err := NewClient(ClientOptions{
 		URL: lookupURL,
 	})
@@ -828,7 +524,7 @@ func TestConsumer_EventTime(t *testing.T) {
 	assert.Equal(t, "test", string(msg.Payload()))
 }
 
-func TestConsumer_Flow(t *testing.T) {
+func TestConsumerFlow(t *testing.T) {
 	client, err := NewClient(ClientOptions{
 		URL: lookupURL,
 	})
diff --git a/util/error.go b/pulsar/helper.go
similarity index 84%
rename from util/error.go
rename to pulsar/helper.go
index 755b7c0..83975f4 100644
--- a/util/error.go
+++ b/pulsar/helper.go
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package util
+package pulsar
 
 import (
 	"fmt"
@@ -26,24 +26,24 @@ import (
 // NewUnexpectedErrMsg instantiates an ErrUnexpectedMsg error.
 // Optionally provide a list of IDs associated with the message
 // for additional context in the error message.
-func NewUnexpectedErrMsg(msgType pb.BaseCommand_Type, ids ...interface{}) *UnexpectedErrMsg {
-	return &UnexpectedErrMsg{
+func newUnexpectedErrMsg(msgType pb.BaseCommand_Type, ids ...interface{}) *unexpectedErrMsg {
+	return &unexpectedErrMsg{
 		msgType: msgType,
 		ids:     ids,
 	}
 }
 
 // UnexpectedErrMsg is returned when an unexpected message is received.
-type UnexpectedErrMsg struct {
+type unexpectedErrMsg struct {
 	msgType pb.BaseCommand_Type
 	ids     []interface{}
 }
 
 // Error satisfies the error interface.
-func (e *UnexpectedErrMsg) Error() string {
+func (e *unexpectedErrMsg) Error() string {
 	msg := fmt.Sprintf("received unexpected message of type %q", e.msgType.String())
 	for _, id := range e.ids {
-		msg += fmt.Sprintf(" id=%v", id)
+		msg += fmt.Sprintf(" consumerID=%v", id)
 	}
 	return msg
 }
diff --git a/pulsar/impl_client.go b/pulsar/impl_client.go
index 7077155..53c2074 100644
--- a/pulsar/impl_client.go
+++ b/pulsar/impl_client.go
@@ -21,12 +21,13 @@ import (
 	"fmt"
 	"net/url"
 
-	"github.com/apache/pulsar-client-go/pkg/auth"
-	"github.com/apache/pulsar-client-go/pkg/pb"
-	"github.com/apache/pulsar-client-go/pulsar/internal"
 	"github.com/pkg/errors"
 
 	log "github.com/sirupsen/logrus"
+
+	"github.com/apache/pulsar-client-go/pkg/auth"
+	"github.com/apache/pulsar-client-go/pkg/pb"
+	"github.com/apache/pulsar-client-go/pulsar/internal"
 )
 
 type client struct {
@@ -97,7 +98,7 @@ func (client *client) CreateProducer(options ProducerOptions) (Producer, error)
 }
 
 func (client *client) Subscribe(options ConsumerOptions) (Consumer, error) {
-	consumer, err := newConsumer(client, &options)
+	consumer, err := newConsumer(client, options)
 	if err != nil {
 		return nil, err
 	}
diff --git a/pulsar/impl_consumer.go b/pulsar/impl_consumer.go
deleted file mode 100644
index 28535b5..0000000
--- a/pulsar/impl_consumer.go
+++ /dev/null
@@ -1,316 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package pulsar
-
-import (
-	"context"
-	"errors"
-	"fmt"
-
-	"github.com/apache/pulsar-client-go/pkg/pb"
-	"github.com/apache/pulsar-client-go/util"
-	"github.com/golang/protobuf/proto"
-
-	log "github.com/sirupsen/logrus"
-)
-
-type consumer struct {
-	topicName       string
-	consumers       []Consumer
-	log             *log.Entry
-	queue           chan ConsumerMessage
-	unackMsgTracker *UnackedMessageTracker
-}
-
-func newConsumer(client *client, options *ConsumerOptions) (*consumer, error) {
-	if options == nil {
-		return nil, newError(ResultInvalidConfiguration, "consumer configuration undefined")
-	}
-
-	if options.Topic == "" && options.Topics == nil && options.TopicsPattern == "" {
-		return nil, newError(TopicNotFound, "topic is required")
-	}
-
-	if options.SubscriptionName == "" {
-		return nil, newError(SubscriptionNotFound, "subscription name is required for consumer")
-	}
-
-	if options.ReceiverQueueSize == 0 {
-		options.ReceiverQueueSize = 1000
-	}
-
-	if options.TopicsPattern != "" {
-		if options.Topics != nil {
-			return nil, newError(ResultInvalidConfiguration, "Topic names list must be null when use topicsPattern")
-		}
-		// TODO: impl logic
-	} else if options.Topics != nil && len(options.Topics) > 1 {
-		// TODO: impl logic
-	} else if options.Topics != nil && len(options.Topics) == 1 || options.Topic != "" {
-		var singleTopicName string
-		if options.Topic != "" {
-			singleTopicName = options.Topic
-		} else {
-			singleTopicName = options.Topics[0]
-		}
-		return singleTopicSubscribe(client, options, singleTopicName)
-	}
-
-	return nil, newError(ResultInvalidTopicName, "topic name is required for consumer")
-}
-
-func singleTopicSubscribe(client *client, options *ConsumerOptions, topic string) (*consumer, error) {
-	c := &consumer{
-		topicName: topic,
-		log:       log.WithField("topic", topic),
-		queue:     make(chan ConsumerMessage, options.ReceiverQueueSize),
-	}
-
-	partitions, err := client.TopicPartitions(topic)
-	if err != nil {
-		return nil, err
-	}
-
-	numPartitions := len(partitions)
-	c.consumers = make([]Consumer, numPartitions)
-
-	type ConsumerError struct {
-		err       error
-		partition int
-		cons      Consumer
-	}
-
-	ch := make(chan ConsumerError, numPartitions)
-
-	for partitionIdx, partitionTopic := range partitions {
-		// this needs to be created outside in the same go routine since
-		// newPartitionConsumer can modify the shared options struct causing a race condition
-		cons, err := newPartitionConsumer(client, partitionTopic, options, partitionIdx, numPartitions, c.queue)
-		go func(partitionIdx int, partitionTopic string) {
-			ch <- ConsumerError{
-				err:       err,
-				partition: partitionIdx,
-				cons:      cons,
-			}
-		}(partitionIdx, partitionTopic)
-	}
-
-	for i := 0; i < numPartitions; i++ {
-		ce, ok := <-ch
-		if ok {
-			err = ce.err
-			c.consumers[ce.partition] = ce.cons
-		}
-	}
-
-	if err != nil {
-		// Since there were some failures, cleanup all the partitions that succeeded in creating the consumers
-		for _, consumer := range c.consumers {
-			if !util.IsNil(consumer) {
-				if err = consumer.Close(); err != nil {
-					panic("close consumer error, please check.")
-				}
-			}
-		}
-		return nil, err
-	}
-
-	return c, nil
-}
-
-func (c *consumer) Topic() string {
-	return c.topicName
-}
-
-func (c *consumer) Subscription() string {
-	return c.consumers[0].Subscription()
-}
-
-func (c *consumer) Unsubscribe() error {
-	var errMsg string
-	for _, consumer := range c.consumers {
-		if err := consumer.Unsubscribe(); err != nil {
-			errMsg += fmt.Sprintf("topic %s, subscription %s: %s", c.Topic(), c.Subscription(), err)
-		}
-	}
-	if errMsg != "" {
-		return errors.New(errMsg)
-	}
-	return nil
-}
-
-func (c *consumer) getMessageFromSubConsumer(ctx context.Context) {
-	for _, pc := range c.consumers {
-		go func(pc Consumer) {
-			err := pc.ReceiveAsync(ctx, c.queue)
-			if err != nil {
-				return
-			}
-		}(pc)
-	}
-}
-
-func (c *consumer) Receive(ctx context.Context) (message Message, err error) {
-	if len(c.consumers) > 1 {
-		select {
-		case <-ctx.Done():
-			return nil, ctx.Err()
-		case cMsg, ok := <-c.queue:
-			if ok {
-				return cMsg.Message, nil
-			}
-			return nil, errors.New("receive message error")
-		}
-	}
-
-	return c.consumers[0].(*partitionConsumer).Receive(ctx)
-}
-
-func (c *consumer) ReceiveAsync(ctx context.Context, msgs chan<- ConsumerMessage) error {
-	for _, pc := range c.consumers {
-		go func(pc Consumer) {
-			if err := pc.ReceiveAsync(ctx, msgs); err != nil {
-				c.log.Errorf("receive async messages error:%s, please check.", err.Error())
-				return
-			}
-		}(pc)
-	}
-
-	return nil
-}
-
-func (c *consumer) ReceiveAsyncWithCallback(ctx context.Context, callback func(msg Message, err error)) {
-	var err error
-	if len(c.consumers) > 1 {
-		select {
-		case <-ctx.Done():
-			c.log.Errorf("ReceiveAsyncWithCallback: receive message error:%s", ctx.Err().Error())
-			return
-		case cMsg, ok := <-c.queue:
-			if ok {
-				callback(cMsg.Message, err)
-			}
-			return
-		}
-	}
-	c.consumers[0].(*partitionConsumer).ReceiveAsyncWithCallback(ctx, callback)
-}
-
-//Ack the consumption of a single message
-func (c *consumer) Ack(msg Message) error {
-	return c.AckID(msg.ID())
-}
-
-// Ack the consumption of a single message, identified by its MessageID
-func (c *consumer) AckID(msgID MessageID) error {
-	id := &pb.MessageIdData{}
-	err := proto.Unmarshal(msgID.Serialize(), id)
-	if err != nil {
-		c.log.WithError(err).Errorf("unserialize message id error:%s", err.Error())
-		return err
-	}
-
-	partition := id.GetPartition()
-	if partition < 0 {
-		return c.consumers[0].AckID(msgID)
-	}
-	return c.consumers[partition].AckID(msgID)
-}
-
-func (c *consumer) AckCumulative(msg Message) error {
-	return c.AckCumulativeID(msg.ID())
-}
-
-func (c *consumer) AckCumulativeID(msgID MessageID) error {
-	id := &pb.MessageIdData{}
-	err := proto.Unmarshal(msgID.Serialize(), id)
-	if err != nil {
-		c.log.WithError(err).Errorf("unserialize message id error:%s", err.Error())
-		return err
-	}
-
-	partition := id.GetPartition()
-	if partition < 0 {
-		return errors.New("invalid partition index")
-	}
-	return c.consumers[partition].AckCumulativeID(msgID)
-}
-
-func (c *consumer) Close() error {
-	for _, pc := range c.consumers {
-		return pc.Close()
-	}
-	return nil
-}
-
-func (c *consumer) Seek(msgID MessageID) error {
-	id := &pb.MessageIdData{}
-	err := proto.Unmarshal(msgID.Serialize(), id)
-	if err != nil {
-		c.log.WithError(err).Errorf("unserialize message id error:%s", err.Error())
-		return err
-	}
-
-	partition := id.GetPartition()
-
-	// current topic is non-partition topic, we only need to get the first value in the consumers.
-	if partition < 0 {
-		partition = 0
-	}
-	return c.consumers[partition].Seek(msgID)
-}
-
-func (c *consumer) RedeliverUnackedMessages() error {
-	var errMsg string
-	for _, c := range c.consumers {
-		if err := c.RedeliverUnackedMessages(); err != nil {
-			errMsg += fmt.Sprintf("topic %s, subscription %s: %s", c.Topic(), c.Subscription(), err)
-		}
-	}
-
-	if errMsg != "" {
-		return errors.New(errMsg)
-	}
-	return nil
-}
-
-func toProtoSubType(st SubscriptionType) pb.CommandSubscribe_SubType {
-	switch st {
-	case Exclusive:
-		return pb.CommandSubscribe_Exclusive
-	case Shared:
-		return pb.CommandSubscribe_Shared
-	case Failover:
-		return pb.CommandSubscribe_Failover
-	case KeyShared:
-		return pb.CommandSubscribe_Key_Shared
-	}
-
-	return pb.CommandSubscribe_Exclusive
-}
-
-func toProtoInitialPosition(p InitialPosition) pb.CommandSubscribe_InitialPosition {
-	switch p {
-	case Latest:
-		return pb.CommandSubscribe_Latest
-	case Earliest:
-		return pb.CommandSubscribe_Earliest
-	}
-
-	return pb.CommandSubscribe_Latest
-}
diff --git a/pulsar/impl_partition_consumer.go b/pulsar/impl_partition_consumer.go
deleted file mode 100644
index 83e09ad..0000000
--- a/pulsar/impl_partition_consumer.go
+++ /dev/null
@@ -1,739 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package pulsar
-
-import (
-	"context"
-	"fmt"
-	"math"
-	"sync"
-	"time"
-
-	"github.com/golang/protobuf/proto"
-
-	log "github.com/sirupsen/logrus"
-
-	"github.com/apache/pulsar-client-go/pkg/pb"
-	"github.com/apache/pulsar-client-go/pulsar/internal"
-	"github.com/apache/pulsar-client-go/util"
-)
-
-const maxRedeliverUnacknowledged = 1000
-
-type consumerState int
-
-const (
-	consumerInit consumerState = iota
-	consumerReady
-	consumerClosing
-	consumerClosed
-)
-
-type partitionConsumer struct {
-	state  consumerState
-	client *client
-	topic  string
-	log    *log.Entry
-	cnx    internal.Connection
-
-	options      *ConsumerOptions
-	consumerName *string
-	consumerID   uint64
-	subQueue     chan ConsumerMessage
-
-	omu               sync.Mutex // protects following
-	redeliverMessages []*pb.MessageIdData
-
-	unAckTracker      *UnackedMessageTracker
-	receivedSinceFlow uint32
-
-	eventsChan   chan interface{}
-	partitionIdx int
-	partitionNum int
-}
-
-func newPartitionConsumer(client *client, topic string, options *ConsumerOptions, partitionID, partitionNum int, ch chan<- ConsumerMessage) (*partitionConsumer, error) {
-	c := &partitionConsumer{
-		state:             consumerInit,
-		client:            client,
-		topic:             topic,
-		options:           options,
-		log:               log.WithField("topic", topic),
-		consumerID:        client.rpcClient.NewConsumerID(),
-		partitionIdx:      partitionID,
-		partitionNum:      partitionNum,
-		eventsChan:        make(chan interface{}, 1),
-		subQueue:          make(chan ConsumerMessage, options.ReceiverQueueSize),
-		receivedSinceFlow: 0,
-	}
-
-	c.setDefault(options)
-
-	if options.MessageChannel == nil {
-		options.MessageChannel = make(chan ConsumerMessage, options.ReceiverQueueSize)
-	} else {
-		c.subQueue = options.MessageChannel
-	}
-
-	if options.Name != "" {
-		c.consumerName = &options.Name
-	}
-
-	if options.Type == Shared || options.Type == KeyShared {
-		if options.AckTimeout != 0 {
-			c.unAckTracker = NewUnackedMessageTracker()
-			c.unAckTracker.pcs = append(c.unAckTracker.pcs, c)
-			c.unAckTracker.Start(int64(options.AckTimeout))
-		}
-	}
-
-	err := c.grabCnx()
-	if err != nil {
-		log.WithError(err).Errorf("Failed to create consumer")
-		return nil, err
-	}
-	c.log = c.log.WithField("name", c.consumerName)
-	c.log.Info("Created consumer")
-	c.state = consumerReady
-
-	// In here, open a gorutine to receive data asynchronously from the subConsumer,
-	// filling the queue channel of the current consumer.
-	if partitionNum > 1 {
-		go func() {
-			err = c.ReceiveAsync(context.Background(), ch)
-			if err != nil {
-				return
-			}
-		}()
-	}
-
-	go c.runEventsLoop()
-
-	return c, nil
-}
-
-func (pc *partitionConsumer) setDefault(options *ConsumerOptions) {
-	if options.ReceiverQueueSize <= 0 {
-		options.ReceiverQueueSize = 1000
-	}
-
-	if options.AckTimeout == 0 {
-		options.AckTimeout = time.Second * 30
-	}
-}
-
-func (pc *partitionConsumer) grabCnx() error {
-	lr, err := pc.client.lookupService.Lookup(pc.topic)
-	if err != nil {
-		pc.log.WithError(err).Warn("Failed to lookup topic")
-		return err
-	}
-	pc.log.Debugf("Lookup result: %v", lr)
-
-	subType := toProtoSubType(pc.options.Type)
-	initialPosition := toProtoInitialPosition(pc.options.SubscriptionInitPos)
-	requestID := pc.client.rpcClient.NewRequestID()
-	res, err := pc.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, requestID,
-		pb.BaseCommand_SUBSCRIBE, &pb.CommandSubscribe{
-			RequestId:       proto.Uint64(requestID),
-			Topic:           proto.String(pc.topic),
-			SubType:         subType.Enum(),
-			Subscription:    proto.String(pc.options.SubscriptionName),
-			ConsumerId:      proto.Uint64(pc.consumerID),
-			ConsumerName:    proto.String(pc.options.Name),
-			InitialPosition: initialPosition.Enum(),
-			Schema:          nil,
-		})
-
-	if err != nil {
-		pc.log.WithError(err).Error("Failed to create consumer")
-		return err
-	}
-
-	if res.Response.ConsumerStatsResponse != nil {
-		pc.consumerName = res.Response.ConsumerStatsResponse.ConsumerName
-	}
-
-	pc.cnx = res.Cnx
-	pc.log.WithField("cnx", res.Cnx).Debug("Connected consumer")
-	pc.cnx.AddConsumeHandler(pc.consumerID, pc)
-
-	msgType := res.Response.GetType()
-
-	switch msgType {
-	case pb.BaseCommand_SUCCESS:
-		return pc.internalFlow(uint32(pc.options.ReceiverQueueSize))
-	case pb.BaseCommand_ERROR:
-		errMsg := res.Response.GetError()
-		return fmt.Errorf("%s: %s", errMsg.GetError().String(), errMsg.GetMessage())
-	default:
-		return util.NewUnexpectedErrMsg(msgType, requestID)
-	}
-}
-
-func (pc *partitionConsumer) Topic() string {
-	return pc.topic
-}
-
-func (pc *partitionConsumer) Subscription() string {
-	return pc.options.SubscriptionName
-}
-
-func (pc *partitionConsumer) Unsubscribe() error {
-	wg := &sync.WaitGroup{}
-	wg.Add(1)
-
-	hu := &handleUnsubscribe{
-		waitGroup: wg,
-		err:       nil,
-	}
-	pc.eventsChan <- hu
-
-	wg.Wait()
-	return hu.err
-}
-
-func (pc *partitionConsumer) internalUnsubscribe(unsub *handleUnsubscribe) {
-	requestID := pc.client.rpcClient.NewRequestID()
-	_, err := pc.client.rpcClient.RequestOnCnx(pc.cnx, requestID,
-		pb.BaseCommand_UNSUBSCRIBE, &pb.CommandUnsubscribe{
-			RequestId:  proto.Uint64(requestID),
-			ConsumerId: proto.Uint64(pc.consumerID),
-		})
-	if err != nil {
-		pc.log.WithError(err).Error("Failed to unsubscribe consumer")
-		unsub.err = err
-	}
-
-	pc.cnx.DeleteConsumeHandler(pc.consumerID)
-	if pc.unAckTracker != nil {
-		pc.unAckTracker.Stop()
-	}
-
-	unsub.waitGroup.Done()
-}
-
-func (pc *partitionConsumer) trackMessage(msgID MessageID) error {
-	id := &pb.MessageIdData{}
-	err := proto.Unmarshal(msgID.Serialize(), id)
-	if err != nil {
-		pc.log.WithError(err).Errorf("unserialize message id error:%s", err.Error())
-		return err
-	}
-	if pc.unAckTracker != nil {
-		pc.unAckTracker.Add(id)
-	}
-	return nil
-}
-
-func (pc *partitionConsumer) increaseAvailablePermits() error {
-	pc.receivedSinceFlow++
-	highWater := uint32(math.Max(float64(pc.options.ReceiverQueueSize/2), 1))
-
-	pc.log.Debugf("receivedSinceFlow size is: %d, highWater size is: %d", pc.receivedSinceFlow, highWater)
-
-	// send flow request after 1/2 of the queue has been consumed
-	if pc.receivedSinceFlow >= highWater {
-		pc.log.Debugf("send flow command to broker, permits size is: %d", pc.receivedSinceFlow)
-		err := pc.internalFlow(pc.receivedSinceFlow)
-		if err != nil {
-			pc.log.Errorf("Send flow cmd error:%s", err.Error())
-			pc.receivedSinceFlow = 0
-			return err
-		}
-		pc.receivedSinceFlow = 0
-	}
-	return nil
-}
-
-func (pc *partitionConsumer) messageProcessed(msgID MessageID) error {
-	err := pc.trackMessage(msgID)
-	if err != nil {
-		return err
-	}
-
-	err = pc.increaseAvailablePermits()
-	if err != nil {
-		return err
-	}
-
-	return nil
-}
-
-func (pc *partitionConsumer) Receive(ctx context.Context) (message Message, err error) {
-	wg := &sync.WaitGroup{}
-	wg.Add(1)
-	pc.ReceiveAsyncWithCallback(ctx, func(msg Message, e error) {
-		message = msg
-		err = e
-		wg.Done()
-	})
-	wg.Wait()
-
-	return message, err
-}
-
-func (pc *partitionConsumer) ReceiveAsync(ctx context.Context, msgs chan<- ConsumerMessage) error {
-	for {
-		select {
-		case tmpMsg, ok := <-pc.subQueue:
-			if ok {
-				msgs <- tmpMsg
-
-				err := pc.messageProcessed(tmpMsg.ID())
-				if err != nil {
-					return err
-				}
-				continue
-			}
-			break
-		case <-ctx.Done():
-			return ctx.Err()
-		}
-	}
-}
-
-func (pc *partitionConsumer) ReceiveAsyncWithCallback(ctx context.Context, callback func(msg Message, err error)) {
-	var err error
-
-	select {
-	case tmpMsg, ok := <-pc.subQueue:
-		if ok {
-			err = pc.messageProcessed(tmpMsg.ID())
-			callback(tmpMsg.Message, err)
-			if err != nil {
-				pc.log.Errorf("processed messages error:%s", err.Error())
-				return
-			}
-		}
-	case <-ctx.Done():
-		pc.log.Errorf("context shouldn't done, please check error:%s", ctx.Err().Error())
-		return
-	}
-}
-
-func (pc *partitionConsumer) Ack(msg Message) error {
-	return pc.AckID(msg.ID())
-}
-
-func (pc *partitionConsumer) AckID(msgID MessageID) error {
-	wg := &sync.WaitGroup{}
-	wg.Add(1)
-	ha := &handleAck{
-		msgID:     msgID,
-		waitGroup: wg,
-		err:       nil,
-	}
-	pc.eventsChan <- ha
-	wg.Wait()
-	return ha.err
-}
-
-func (pc *partitionConsumer) internalAck(ack *handleAck) {
-	id := &pb.MessageIdData{}
-	messageIDs := make([]*pb.MessageIdData, 0)
-	err := proto.Unmarshal(ack.msgID.Serialize(), id)
-	if err != nil {
-		pc.log.WithError(err).Errorf("unserialize message id error:%s", err.Error())
-		ack.err = err
-	}
-
-	messageIDs = append(messageIDs, id)
-
-	requestID := pc.client.rpcClient.NewRequestID()
-	_, err = pc.client.rpcClient.RequestOnCnxNoWait(pc.cnx, requestID,
-		pb.BaseCommand_ACK, &pb.CommandAck{
-			ConsumerId: proto.Uint64(pc.consumerID),
-			MessageId:  messageIDs,
-			AckType:    pb.CommandAck_Individual.Enum(),
-		})
-	if err != nil {
-		pc.log.WithError(err).Error("Failed to unsubscribe consumer")
-		ack.err = err
-	}
-
-	if pc.unAckTracker != nil {
-		pc.unAckTracker.Remove(id)
-	}
-	ack.waitGroup.Done()
-}
-
-func (pc *partitionConsumer) AckCumulative(msg Message) error {
-	return pc.AckCumulativeID(msg.ID())
-}
-
-func (pc *partitionConsumer) AckCumulativeID(msgID MessageID) error {
-	hac := &handleAckCumulative{
-		msgID: msgID,
-		err:   nil,
-	}
-	pc.eventsChan <- hac
-
-	return hac.err
-}
-
-func (pc *partitionConsumer) internalAckCumulative(ackCumulative *handleAckCumulative) {
-	id := &pb.MessageIdData{}
-	messageIDs := make([]*pb.MessageIdData, 0)
-	err := proto.Unmarshal(ackCumulative.msgID.Serialize(), id)
-	if err != nil {
-		pc.log.WithError(err).Errorf("unserialize message id error:%s", err.Error())
-		ackCumulative.err = err
-	}
-	messageIDs = append(messageIDs, id)
-
-	requestID := pc.client.rpcClient.NewRequestID()
-	_, err = pc.client.rpcClient.RequestOnCnx(pc.cnx, requestID,
-		pb.BaseCommand_ACK, &pb.CommandAck{
-			ConsumerId: proto.Uint64(pc.consumerID),
-			MessageId:  messageIDs,
-			AckType:    pb.CommandAck_Cumulative.Enum(),
-		})
-	if err != nil {
-		pc.log.WithError(err).Error("Failed to unsubscribe consumer")
-		ackCumulative.err = err
-	}
-
-	if pc.unAckTracker != nil {
-		pc.unAckTracker.Remove(id)
-	}
-}
-
-func (pc *partitionConsumer) Close() error {
-	if pc.state != consumerReady {
-		return nil
-	}
-	if pc.unAckTracker != nil {
-		pc.unAckTracker.Stop()
-	}
-
-	wg := sync.WaitGroup{}
-	wg.Add(1)
-
-	cc := &handlerClose{&wg, nil}
-	pc.eventsChan <- cc
-
-	wg.Wait()
-	return cc.err
-}
-
-func (pc *partitionConsumer) Seek(msgID MessageID) error {
-	wg := &sync.WaitGroup{}
-	wg.Add(1)
-
-	hc := &handleSeek{
-		msgID:     msgID,
-		waitGroup: wg,
-		err:       nil,
-	}
-	pc.eventsChan <- hc
-
-	wg.Wait()
-	return hc.err
-}
-
-func (pc *partitionConsumer) internalSeek(seek *handleSeek) {
-	if pc.state == consumerClosing || pc.state == consumerClosed {
-		pc.log.Error("Consumer was already closed")
-		return
-	}
-
-	id := &pb.MessageIdData{}
-	err := proto.Unmarshal(seek.msgID.Serialize(), id)
-	if err != nil {
-		pc.log.WithError(err).Errorf("unserialize message id error:%s", err.Error())
-		seek.err = err
-	}
-
-	requestID := pc.client.rpcClient.NewRequestID()
-	_, err = pc.client.rpcClient.RequestOnCnx(pc.cnx, requestID,
-		pb.BaseCommand_SEEK, &pb.CommandSeek{
-			ConsumerId: proto.Uint64(pc.consumerID),
-			RequestId:  proto.Uint64(requestID),
-			MessageId:  id,
-		})
-	if err != nil {
-		pc.log.WithError(err).Error("Failed to unsubscribe consumer")
-		seek.err = err
-	}
-
-	seek.waitGroup.Done()
-}
-
-func (pc *partitionConsumer) RedeliverUnackedMessages() error {
-	wg := &sync.WaitGroup{}
-	wg.Add(1)
-
-	hr := &handleRedeliver{
-		waitGroup: wg,
-		err:       nil,
-	}
-	pc.eventsChan <- hr
-	wg.Wait()
-	return hr.err
-}
-
-func (pc *partitionConsumer) internalRedeliver(redeliver *handleRedeliver) {
-	pc.omu.Lock()
-	defer pc.omu.Unlock()
-
-	redeliverMessagesSize := len(pc.redeliverMessages)
-
-	if redeliverMessagesSize == 0 {
-		return
-	}
-
-	requestID := pc.client.rpcClient.NewRequestID()
-
-	for i := 0; i < len(pc.redeliverMessages); i += maxRedeliverUnacknowledged {
-		end := i + maxRedeliverUnacknowledged
-		if end > redeliverMessagesSize {
-			end = redeliverMessagesSize
-		}
-		_, err := pc.client.rpcClient.RequestOnCnxNoWait(pc.cnx, requestID,
-			pb.BaseCommand_REDELIVER_UNACKNOWLEDGED_MESSAGES, &pb.CommandRedeliverUnacknowledgedMessages{
-				ConsumerId: proto.Uint64(pc.consumerID),
-				MessageIds: pc.redeliverMessages[i:end],
-			})
-		if err != nil {
-			pc.log.WithError(err).Error("Failed to unsubscribe consumer")
-			redeliver.err = err
-		}
-	}
-
-	// clear redeliverMessages slice
-	pc.redeliverMessages = nil
-
-	if pc.unAckTracker != nil {
-		pc.unAckTracker.clear()
-	}
-	redeliver.waitGroup.Done()
-}
-
-func (pc *partitionConsumer) runEventsLoop() {
-	for {
-		select {
-		case i := <-pc.eventsChan:
-			switch v := i.(type) {
-			case *handlerClose:
-				pc.internalClose(v)
-				return
-			case *handleSeek:
-				pc.internalSeek(v)
-			case *handleUnsubscribe:
-				pc.internalUnsubscribe(v)
-			case *handleAckCumulative:
-				pc.internalAckCumulative(v)
-			case *handleAck:
-				pc.internalAck(v)
-			case *handleRedeliver:
-				pc.internalRedeliver(v)
-			case *handleConnectionClosed:
-				pc.reconnectToBroker()
-			}
-		}
-	}
-}
-
-func (pc *partitionConsumer) internalClose(req *handlerClose) {
-	if pc.state != consumerReady {
-		req.waitGroup.Done()
-		return
-	}
-
-	pc.state = consumerClosing
-	pc.log.Info("Closing consumer")
-
-	requestID := pc.client.rpcClient.NewRequestID()
-	_, err := pc.client.rpcClient.RequestOnCnxNoWait(pc.cnx, requestID, pb.BaseCommand_CLOSE_CONSUMER, &pb.CommandCloseConsumer{
-		ConsumerId: proto.Uint64(pc.consumerID),
-		RequestId:  proto.Uint64(requestID),
-	})
-	pc.cnx.DeleteConsumeHandler(pc.consumerID)
-
-	if err != nil {
-		req.err = err
-	} else {
-		pc.log.Info("Closed consumer")
-		pc.state = consumerClosed
-		close(pc.options.MessageChannel)
-	}
-
-	req.waitGroup.Done()
-}
-
-// Flow command gives additional permits to send messages to the consumer.
-// A typical consumer implementation will use a queue to accuMulate these messages
-// before the application is ready to consume them. After the consumer is ready,
-// the client needs to give permission to the broker to push messages.
-func (pc *partitionConsumer) internalFlow(permits uint32) error {
-	if permits <= 0 {
-		return fmt.Errorf("invalid number of permits requested: %d", permits)
-	}
-
-	requestID := pc.client.rpcClient.NewRequestID()
-	_, err := pc.client.rpcClient.RequestOnCnxNoWait(pc.cnx, requestID,
-		pb.BaseCommand_FLOW, &pb.CommandFlow{
-			ConsumerId:     proto.Uint64(pc.consumerID),
-			MessagePermits: proto.Uint32(permits),
-		})
-
-	if err != nil {
-		pc.log.WithError(err).Error("Failed to unsubscribe consumer")
-		return err
-	}
-	return nil
-}
-
-func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, headersAndPayload internal.Buffer) error {
-	pbMsgID := response.GetMessageId()
-	reader := internal.NewMessageReader(headersAndPayload.ReadableSlice())
-	msgMeta, err := reader.ReadMessageMetadata()
-	if err != nil {
-		// TODO send discardCorruptedMessage
-		return err
-	}
-
-	numMsgs := 1
-	if msgMeta.NumMessagesInBatch != nil {
-		numMsgs = int(msgMeta.GetNumMessagesInBatch())
-	}
-	for i := 0; i < numMsgs; i++ {
-		ssm, payload, err := reader.ReadMessage()
-		if err != nil {
-			// TODO send
-			return err
-		}
-
-		msgID := newMessageID(int64(pbMsgID.GetLedgerId()), int64(pbMsgID.GetEntryId()), i, pc.partitionIdx)
-		var msg Message
-		if ssm == nil {
-			msg = &message{
-				publishTime: timeFromUnixTimestampMillis(msgMeta.GetPublishTime()),
-				eventTime:   timeFromUnixTimestampMillis(msgMeta.GetEventTime()),
-				key:         msgMeta.GetPartitionKey(),
-				properties:  internal.ConvertToStringMap(msgMeta.GetProperties()),
-				topic:       pc.topic,
-				msgID:       msgID,
-				payLoad:     payload,
-			}
-		} else {
-			msg = &message{
-				publishTime: timeFromUnixTimestampMillis(msgMeta.GetPublishTime()),
-				eventTime:   timeFromUnixTimestampMillis(ssm.GetEventTime()),
-				key:         ssm.GetPartitionKey(),
-				properties:  internal.ConvertToStringMap(ssm.GetProperties()),
-				topic:       pc.topic,
-				msgID:       msgID,
-				payLoad:     payload,
-			}
-		}
-
-		if err := pc.dispatchMessage(msg, pbMsgID); err != nil {
-			// TODO handle error
-			return err
-		}
-	}
-
-	return nil
-}
-
-
-func (pc *partitionConsumer) dispatchMessage(msg Message, msgID *pb.MessageIdData) error {
-	select {
-	case pc.subQueue <- ConsumerMessage{Consumer:pc, Message:msg}:
-		//Add messageId to redeliverMessages buffer, avoiding duplicates.
-		var dup bool
-
-		pc.omu.Lock()
-		for _, mid := range pc.redeliverMessages {
-			if proto.Equal(mid, msgID) {
-				dup = true
-				break
-			}
-		}
-
-		if !dup {
-			pc.redeliverMessages = append(pc.redeliverMessages, msgID)
-		}
-		pc.omu.Unlock()
-	default:
-		return fmt.Errorf("consumer message channel on topic %s is full (capacity = %d)", pc.Topic(), cap(pc.options.MessageChannel))
-	}
-	return nil
-}
-
-type handleAck struct {
-	msgID     MessageID
-	waitGroup *sync.WaitGroup
-	err       error
-}
-
-type handleAckCumulative struct {
-	msgID MessageID
-	err   error
-}
-
-type handleUnsubscribe struct {
-	waitGroup *sync.WaitGroup
-	err       error
-}
-
-type handleSeek struct {
-	msgID     MessageID
-	waitGroup *sync.WaitGroup
-	err       error
-}
-
-type handleRedeliver struct {
-	waitGroup *sync.WaitGroup
-	err       error
-}
-
-type handlerClose struct {
-	waitGroup *sync.WaitGroup
-	err       error
-}
-
-type handleConnectionClosed struct{}
-
-func (pc *partitionConsumer) ConnectionClosed() {
-	// Trigger reconnection in the consumer goroutine
-	pc.eventsChan <- &handleConnectionClosed{}
-}
-
-func (pc *partitionConsumer) reconnectToBroker() {
-	backoff := internal.Backoff{}
-	for {
-		if pc.state != consumerReady {
-			// Consumer is already closing
-			return
-		}
-
-		d := backoff.Next()
-		pc.log.Info("Reconnecting to broker in ", d)
-		time.Sleep(d)
-
-		err := pc.grabCnx()
-		if err == nil {
-			// Successfully reconnected
-			pc.log.Info("Reconnected consumer to broker")
-			return
-		}
-	}
-}
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index a68e34a..b1d4faa 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -34,7 +34,6 @@ import (
 
 	"github.com/apache/pulsar-client-go/pkg/auth"
 	"github.com/apache/pulsar-client-go/pkg/pb"
-	"github.com/apache/pulsar-client-go/util"
 )
 
 type TLSOptions struct {
@@ -286,7 +285,10 @@ func (c *connection) run() {
 			if req == nil {
 				return
 			}
-			c.pendingReqs[req.id] = req
+			// does this request expect a response?
+			if req.id != RequestIDNoResponse {
+				c.pendingReqs[req.id] = req
+			}
 			c.writeCommand(req.cmd)
 
 		case cmd := <- c.incomingCmdCh:
@@ -509,9 +511,7 @@ func (c *connection) handleCloseConsumer(closeConsumer *pb.CommandCloseConsumer)
 	c.log.Infof("Broker notification of Closed consumer: %d", closeConsumer.GetConsumerId())
 	consumerID := closeConsumer.GetConsumerId()
 	if consumer, ok := c.consumerHandler(consumerID); ok {
-		if !util.IsNil(consumer) {
-			consumer.ConnectionClosed()
-		}
+		consumer.ConnectionClosed()
 	} else {
 		c.log.WithField("consumerID", consumerID).Warnf("Consumer with ID not found while closing consumer")
 	}
@@ -564,9 +564,14 @@ func (c *connection) Close() {
 		listener.ConnectionClosed()
 	}
 
+	consumerHandlers := make(map[uint64]ConsumerHandler)
 	c.consumerHandlersLock.RLock()
-	defer c.consumerHandlersLock.RUnlock()
-	for _, handler := range c.consumerHandlers {
+	for id, handler := range c.consumerHandlers {
+		consumerHandlers[id] = handler
+	}
+	c.consumerHandlersLock.RUnlock()
+
+	for _, handler := range consumerHandlers {
 		handler.ConnectionClosed()
 	}
 }
diff --git a/pulsar/internal/rpc_client.go b/pulsar/internal/rpc_client.go
index 35d8b61..6a01fe0 100644
--- a/pulsar/internal/rpc_client.go
+++ b/pulsar/internal/rpc_client.go
@@ -31,6 +31,9 @@ type RPCResult struct {
 	Cnx      Connection
 }
 
+// RequestID for a request when there is no expected response
+const RequestIDNoResponse = uint64(0)
+
 type RPCClient interface {
 	// Create a new unique request id
 	NewRequestID() uint64
diff --git a/pulsar/test_helper.go b/pulsar/test_helper.go
index 9930a99..1762fd7 100644
--- a/pulsar/test_helper.go
+++ b/pulsar/test_helper.go
@@ -23,6 +23,8 @@ import (
 	"fmt"
 	"log"
 	"net/http"
+	"strings"
+	"testing"
 	"time"
 )
 
@@ -63,3 +65,21 @@ func httpPut(url string, body interface{}) {
 	}
 	resp.Body.Close()
 }
+
+func makeHTTPCall(t *testing.T, method string, urls string, body string) {
+	client := http.Client{}
+
+	req, err := http.NewRequest(method, urls, strings.NewReader(body))
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	req.Header.Set("Content-Type", "application/json")
+	req.Header.Set("Accept", "application/json")
+
+	res, err := client.Do(req)
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer res.Body.Close()
+}
diff --git a/pulsar/unacked_msg_tracker.go b/pulsar/unacked_msg_tracker.go
index ffc6eff..5dfe895 100644
--- a/pulsar/unacked_msg_tracker.go
+++ b/pulsar/unacked_msg_tracker.go
@@ -174,7 +174,7 @@ func (t *UnackedMessageTracker) handlerCmd() {
 								MessageIds: messageIdsMap[int32(index)],
 							}
 
-							_, err := subConsumer.client.rpcClient.RequestOnCnx(subConsumer.cnx, requestID,
+							_, err := subConsumer.client.rpcClient.RequestOnCnx(subConsumer.conn, requestID,
 								pb.BaseCommand_REDELIVER_UNACKNOWLEDGED_MESSAGES, cmd)
 							if err != nil {
 								subConsumer.log.WithError(err).Error("Failed to unsubscribe consumer")
diff --git a/util/util.go b/util/util.go
deleted file mode 100644
index bd4f5d6..0000000
--- a/util/util.go
+++ /dev/null
@@ -1,44 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package util
-
-import (
-	"reflect"
-)
-
-// IsNil check if the interface is nil
-func IsNil(i interface{}) bool {
-	vi := reflect.ValueOf(i)
-	if vi.Kind() == reflect.Ptr {
-		return vi.IsNil()
-	}
-	return false
-}
-
-// RemoveDuplicateElement remove repeating elements from the string slice
-func RemoveDuplicateElement(addrs []string) []string {
-	result := make([]string, 0, len(addrs))
-	temp := map[string]struct{}{}
-	for _, item := range addrs {
-		if _, ok := temp[item]; !ok {
-			temp[item] = struct{}{}
-			result = append(result, item)
-		}
-	}
-	return result
-}
diff --git a/util/util_test.go b/util/util_test.go
deleted file mode 100644
index 3b0a9f9..0000000
--- a/util/util_test.go
+++ /dev/null
@@ -1,41 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package util
-
-import (
-	"fmt"
-	"strings"
-	"testing"
-
-	"github.com/stretchr/testify/assert"
-)
-
-func TestIsNil(t *testing.T) {
-	var a interface{}
-	var b interface{} = (*int)(nil)
-
-	assert.True(t, a == nil)
-	assert.False(t, b == nil)
-}
-
-func TestRemoveDuplicateElement(t *testing.T) {
-	s := []string{"hello", "world", "hello", "golang", "hello", "ruby", "php", "java"}
-	resList := RemoveDuplicateElement(s)
-	res := fmt.Sprintf("%s", resList)
-	assert.Equal(t, 1, strings.Count(res, "hello"))
-}