You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2019/08/14 03:00:06 UTC

[pulsar-client-go] branch master updated: Support partition consumer receive async and fix batch logic (#43)

This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 448387d  Support partition consumer receive async and fix batch logic (#43)
448387d is described below

commit 448387d738a2f3af4c8232daa4fac9576d252617
Author: 冉小龙 <ra...@gmail.com>
AuthorDate: Wed Aug 14 11:00:00 2019 +0800

    Support partition consumer receive async and fix batch logic (#43)
    
    Signed-off-by: xiaolong.ran ranxiaolong716@gmail.com
    
    * Support batch logic for project
    
    * add unit test case of event time
    
    * add some unit tests case for producer
    
    * fix error result type
    
    * add unit test case of producer flush
    
    * add receiver queue size test logic
    
    * support partition consumer receive async
    
    * add unit test case of ack timeout
    
    * Fix consumer receiving message out of order
---
 pulsar/consumer.go                |   3 +
 pulsar/consumer_test.go           | 301 +++++++++++++++++++++++++++++++++++++-
 pulsar/error.go                   |   2 +-
 pulsar/impl_consumer.go           |  55 +++++--
 pulsar/impl_partition_consumer.go | 217 ++++++++++++++++-----------
 pulsar/impl_partition_producer.go |  21 ++-
 pulsar/internal/commands.go       |  82 +++++++----
 pulsar/internal/connection.go     |  54 +++----
 pulsar/producer_test.go           | 260 +++++++++++++++++++++++++++++++-
 pulsar/unackedMsgTracker.go       |  20 +--
 util/util.go                      |  24 ++-
 util/util_test.go                 |  21 ++-
 12 files changed, 870 insertions(+), 190 deletions(-)

diff --git a/pulsar/consumer.go b/pulsar/consumer.go
index de190e0..c259cd6 100644
--- a/pulsar/consumer.go
+++ b/pulsar/consumer.go
@@ -142,6 +142,9 @@ type Consumer interface {
 	// ReceiveAsync appends the message to the msgs channel asynchronously.
 	ReceiveAsync(ctx context.Context, msgs chan<- ConsumerMessage) error
 
+	// ReceiveAsyncWithCallback returns a callback containing the message and error objects
+	ReceiveAsyncWithCallback(ctx context.Context, callback func(msg Message, err error))
+
 	// Ack the consumption of a single message
 	Ack(Message) error
 
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 39646d3..6fe86cd 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -1,4 +1,3 @@
-//
 // Licensed to the Apache Software Foundation (ASF) under one
 // or more contributor license agreements.  See the NOTICE file
 // distributed with this work for additional information
@@ -15,7 +14,6 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-//
 
 package pulsar
 
@@ -124,6 +122,67 @@ func TestConsumerConnectError(t *testing.T) {
 	assert.Equal(t, err.Error(), "connection error")
 }
 
+func TestBatchMessageReceive(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topicName := "persistent://public/default/receive-batch"
+	subName := "subscription-name"
+	prefix := "msg-batch-"
+	ctx := context.Background()
+
+	// Enable batching on producer side
+	batchSize, numOfMessages := 2, 100
+
+	// create producer
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic:               topicName,
+		BatchingMaxMessages: uint(batchSize),
+		DisableBatching:     false,
+		BlockIfQueueFull:    true,
+	})
+	assert.Nil(t, err)
+	assert.Equal(t, topicName, producer.Topic())
+	defer producer.Close()
+
+	consumer, err := client.Subscribe(ConsumerOptions{
+		Topic:            topicName,
+		SubscriptionName: subName,
+	})
+	assert.Equal(t, topicName, consumer.Topic())
+	count := 0
+
+	for i := 0; i < numOfMessages; i++ {
+		messageContent := prefix + fmt.Sprintf("%d", i)
+		msg := &ProducerMessage{
+			Payload: []byte(messageContent),
+		}
+		err := producer.Send(ctx, msg)
+		assert.Nil(t, err)
+	}
+
+	for i := 0; i < numOfMessages; i++ {
+		msg, err := consumer.Receive(ctx)
+		assert.Nil(t, err)
+		err = consumer.Ack(msg)
+		assert.Nil(t, err)
+		count++
+	}
+
+	// check strategically
+	for i := 0; i < 3; i++ {
+		if count == numOfMessages {
+			break
+		}
+		time.Sleep(time.Second)
+	}
+	assert.Equal(t, count, numOfMessages)
+}
+
 func TestConsumerWithInvalidConf(t *testing.T) {
 	client, err := NewClient(ClientOptions{
 		URL: lookupURL,
@@ -263,7 +322,7 @@ func TestConsumerKeyShared(t *testing.T) {
 		assert.Nil(t, err)
 	}
 
-	time.Sleep(time.Second * 5)
+	time.Sleep(time.Second * 1)
 
 	go func() {
 		for i := 0; i < 10; i++ {
@@ -288,6 +347,8 @@ func TestConsumerKeyShared(t *testing.T) {
 			}
 		}
 	}()
+
+	time.Sleep(time.Second * 1)
 }
 
 func TestPartitionTopicsConsumerPubSub(t *testing.T) {
@@ -300,7 +361,7 @@ func TestPartitionTopicsConsumerPubSub(t *testing.T) {
 	topic := "persistent://public/default/testGetPartitions"
 	testURL := adminURL + "/" + "admin/v2/persistent/public/default/testGetPartitions/partitions"
 
-	makeHTTPCall(t, http.MethodPut, testURL, "3")
+	makeHTTPCall(t, http.MethodPut, testURL, "5")
 
 	// create producer
 	producer, err := client.CreateProducer(ProducerOptions{
@@ -316,9 +377,10 @@ func TestPartitionTopicsConsumerPubSub(t *testing.T) {
 	assert.Equal(t, topic+"-partition-2", topics[2])
 
 	consumer, err := client.Subscribe(ConsumerOptions{
-		Topic:            topic,
-		SubscriptionName: "my-sub",
-		Type:             Exclusive,
+		Topic:             topic,
+		SubscriptionName:  "my-sub",
+		Type:              Exclusive,
+		ReceiverQueueSize: 10,
 	})
 	assert.Nil(t, err)
 	defer consumer.Close()
@@ -348,3 +410,228 @@ func TestPartitionTopicsConsumerPubSub(t *testing.T) {
 
 	assert.Equal(t, len(msgs), 10)
 }
+
+func TestConsumer_ReceiveAsync(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topicName := "persistent://public/default/receive-async"
+	subName := "subscription-receive-async"
+	ctx := context.Background()
+	ch := make(chan ConsumerMessage, 10)
+
+	// create producer
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic: topicName,
+	})
+	defer producer.Close()
+
+	consumer, err := client.Subscribe(ConsumerOptions{
+		Topic:            topicName,
+		SubscriptionName: subName,
+	})
+	defer consumer.Close()
+
+	//send 10 messages
+	for i := 0; i < 10; i++ {
+		err := producer.Send(ctx, &ProducerMessage{
+			Payload: []byte(fmt.Sprintf("hello-%d", i)),
+		})
+		assert.Nil(t, err)
+	}
+
+	//receive async 10 messages
+	err = consumer.ReceiveAsync(ctx, ch)
+	assert.Nil(t, err)
+
+	payloadList := make([]string, 0, 10)
+
+RECEIVE:
+	for {
+		select {
+		case cMsg, ok := <-ch:
+			if ok {
+				fmt.Printf("receive message payload is:%s\n", string(cMsg.Payload()))
+				assert.Equal(t, topicName, cMsg.Message.Topic())
+				assert.Equal(t, topicName, cMsg.Consumer.Topic())
+				payloadList = append(payloadList, string(cMsg.Message.Payload()))
+				if len(payloadList) == 10 {
+					break RECEIVE
+				}
+			}
+			continue RECEIVE
+		case <-ctx.Done():
+			t.Error("context error.")
+			return
+		}
+	}
+}
+
+func TestConsumerAckTimeout(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topic := "test-ack-timeout-topic-1"
+	ctx := context.Background()
+
+	// create consumer
+	consumer, err := client.Subscribe(ConsumerOptions{
+		Topic:            topic,
+		SubscriptionName: "my-sub1",
+		Type:             Shared,
+		AckTimeout:       5 * 1000,
+	})
+	assert.Nil(t, err)
+	defer consumer.Close()
+
+	// create consumer1
+	consumer1, err := client.Subscribe(ConsumerOptions{
+		Topic:            topic,
+		SubscriptionName: "my-sub2",
+		Type:             Shared,
+		AckTimeout:       5 * 1000,
+	})
+	assert.Nil(t, err)
+	defer consumer1.Close()
+
+	// create producer
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic:           topic,
+		DisableBatching: true,
+	})
+	assert.Nil(t, err)
+	defer producer.Close()
+
+	// send 10 messages
+	for i := 0; i < 10; i++ {
+		if err := producer.Send(ctx, &ProducerMessage{
+			Payload: []byte(fmt.Sprintf("hello-%d", i)),
+		}); err != nil {
+			log.Fatal(err)
+		}
+	}
+
+	// consumer receive 10 messages
+	payloadList := make([]string, 0, 10)
+	for i := 0; i < 10; i++ {
+		msg, err := consumer.Receive(context.Background())
+		if err != nil {
+			log.Fatal(err)
+		}
+		payloadList = append(payloadList, string(msg.Payload()))
+
+		// not ack message
+	}
+	assert.Equal(t, 10, len(payloadList))
+
+	// consumer1 receive 10 messages
+	for i := 0; i < 10; i++ {
+		msg, err := consumer1.Receive(context.Background())
+		if err != nil {
+			log.Fatal(err)
+		}
+
+		payloadList = append(payloadList, string(msg.Payload()))
+
+		// ack half of the messages
+		if i%2 == 0 {
+			err = consumer1.Ack(msg)
+			assert.Nil(t, err)
+		}
+	}
+
+	// wait ack timeout
+	time.Sleep(6 * time.Second)
+
+	fmt.Println("start redeliver messages...")
+
+	payloadList = make([]string, 0, 10)
+	// consumer receive messages again
+	for i := 0; i < 10; i++ {
+		msg, err := consumer.Receive(context.Background())
+		if err != nil {
+			log.Fatal(err)
+		}
+		payloadList = append(payloadList, string(msg.Payload()))
+
+		// ack message
+		if err := consumer.Ack(msg); err != nil {
+			log.Fatal(err)
+		}
+	}
+	assert.Equal(t, 10, len(payloadList))
+
+	payloadList = make([]string, 0, 5)
+	// consumer1 receive messages again
+	go func() {
+		for i := 0; i < 10; i++ {
+			msg, err := consumer1.Receive(context.Background())
+			if err != nil {
+				log.Fatal(err)
+			}
+
+			expectMsg := fmt.Sprintf("hello-%d", i)
+			fmt.Printf("redeliver messages, payload is:%s\n", expectMsg)
+			payloadList = append(payloadList, string(msg.Payload()))
+
+			// ack message
+			if err := consumer1.Ack(msg); err != nil {
+				log.Fatal(err)
+			}
+		}
+		assert.Equal(t, 5, len(payloadList))
+	}()
+
+	// sleep 2 seconds, wait gorutine receive messages.
+	time.Sleep(time.Second * 2)
+}
+
+func TestConsumer_ReceiveAsyncWithCallback(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topicName := "persistent://public/default/receive-async-with-callback"
+	subName := "subscription-receive-async"
+	ctx := context.Background()
+
+	// create producer
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic: topicName,
+	})
+	defer producer.Close()
+
+	consumer, err := client.Subscribe(ConsumerOptions{
+		Topic:            topicName,
+		SubscriptionName: subName,
+	})
+	defer consumer.Close()
+
+	//send 10 messages
+	for i := 0; i < 10; i++ {
+		err := producer.Send(ctx, &ProducerMessage{
+			Payload: []byte(fmt.Sprintf("hello-%d", i)),
+		})
+		assert.Nil(t, err)
+	}
+
+	for i := 0; i < 10; i++ {
+		consumer.ReceiveAsyncWithCallback(ctx, func(msg Message, err error) {
+			if err != nil {
+				log.Fatal(err)
+			}
+			fmt.Printf("receive message payload is:%s\n", string(msg.Payload()))
+			assert.Equal(t, fmt.Sprintf("hello-%d", i), string(msg.Payload()))
+		})
+	}
+}
diff --git a/pulsar/error.go b/pulsar/error.go
index 0231913..ec20844 100644
--- a/pulsar/error.go
+++ b/pulsar/error.go
@@ -24,7 +24,7 @@ type Result int
 
 const (
 	// ResultOk means no errors
-	ResultOk = iota
+	ResultOk Result = iota
 	// ResultUnknownError means unknown error happened on broker
 	ResultUnknownError
 	// ResultInvalidConfiguration means invalid configuration
diff --git a/pulsar/impl_consumer.go b/pulsar/impl_consumer.go
index 0a44971..13e72ae 100644
--- a/pulsar/impl_consumer.go
+++ b/pulsar/impl_consumer.go
@@ -79,6 +79,7 @@ func newConsumer(client *client, options *ConsumerOptions) (*consumer, error) {
 func singleTopicSubscribe(client *client, options *ConsumerOptions, topic string) (*consumer, error) {
 	c := &consumer{
 		topicName: topic,
+		log:       log.WithField("topic", topic),
 		queue:     make(chan ConsumerMessage, options.ReceiverQueueSize),
 	}
 
@@ -100,7 +101,7 @@ func singleTopicSubscribe(client *client, options *ConsumerOptions, topic string
 
 	for partitionIdx, partitionTopic := range partitions {
 		go func(partitionIdx int, partitionTopic string) {
-			cons, err := newPartitionConsumer(client, partitionTopic, options, partitionIdx)
+			cons, err := newPartitionConsumer(client, partitionTopic, options, partitionIdx, numPartitions, c.queue)
 			ch <- ConsumerError{
 				err:       err,
 				partition: partitionIdx,
@@ -153,31 +154,63 @@ func (c *consumer) Unsubscribe() error {
 	return nil
 }
 
-func (c *consumer) Receive(ctx context.Context) (Message, error) {
+func (c *consumer) getMessageFromSubConsumer(ctx context.Context) {
 	for _, pc := range c.consumers {
 		go func(pc Consumer) {
-			if err := pc.ReceiveAsync(ctx, c.queue); err != nil {
+			err := pc.ReceiveAsync(ctx, c.queue)
+			if err != nil {
 				return
 			}
 		}(pc)
 	}
+}
 
-	select {
-	case <-ctx.Done():
-		return nil, ctx.Err()
-	case msg, ok := <-c.queue:
-		if ok {
-			return msg.Message, nil
+func (c *consumer) Receive(ctx context.Context) (message Message, err error) {
+	if len(c.consumers) > 1 {
+		select {
+		case <-ctx.Done():
+			return nil, ctx.Err()
+		case cMsg, ok := <-c.queue:
+			if ok {
+				return cMsg.Message, nil
+			}
+			return nil, errors.New("receive message error")
 		}
-		return nil, errors.New("receive message error")
 	}
+
+	return c.consumers[0].(*partitionConsumer).Receive(ctx)
 }
 
 func (c *consumer) ReceiveAsync(ctx context.Context, msgs chan<- ConsumerMessage) error {
-	//TODO: impl logic
+	for _, pc := range c.consumers {
+		go func(pc Consumer) {
+			if err := pc.ReceiveAsync(ctx, msgs); err != nil {
+				c.log.Errorf("receive async messages error:%s, please check.", err.Error())
+				return
+			}
+		}(pc)
+	}
+
 	return nil
 }
 
+func (c *consumer) ReceiveAsyncWithCallback(ctx context.Context, callback func(msg Message, err error)) {
+	var err error
+	if len(c.consumers) > 1 {
+		select {
+		case <-ctx.Done():
+			c.log.Errorf("ReceiveAsyncWithCallback: receive message error:%s", ctx.Err().Error())
+			return
+		case cMsg, ok := <-c.queue:
+			if ok {
+				callback(cMsg.Message, err)
+			}
+			return
+		}
+	}
+	c.consumers[0].(*partitionConsumer).ReceiveAsyncWithCallback(ctx, callback)
+}
+
 //Ack the consumption of a single message
 func (c *consumer) Ack(msg Message) error {
 	return c.AckID(msg.ID())
diff --git a/pulsar/impl_partition_consumer.go b/pulsar/impl_partition_consumer.go
index 0d7069f..87cf68b 100644
--- a/pulsar/impl_partition_consumer.go
+++ b/pulsar/impl_partition_consumer.go
@@ -37,7 +37,7 @@ const maxRedeliverUnacknowledged = 1000
 type consumerState int
 
 const (
-	consumerInit = iota
+	consumerInit consumerState = iota
 	consumerReady
 	consumerClosing
 	consumerClosed
@@ -60,16 +60,17 @@ type partitionConsumer struct {
 	consumerID   uint64
 	subQueue     chan ConsumerMessage
 
-	omu      sync.Mutex // protects following
-	overflow []*pb.MessageIdData
+	omu               sync.Mutex // protects following
+	redeliverMessages []*pb.MessageIdData
 
 	unAckTracker *UnackedMessageTracker
 
 	eventsChan   chan interface{}
 	partitionIdx int
+	partitionNum int
 }
 
-func newPartitionConsumer(client *client, topic string, options *ConsumerOptions, partitionID int) (*partitionConsumer, error) {
+func newPartitionConsumer(client *client, topic string, options *ConsumerOptions, partitionID, partitionNum int, ch chan ConsumerMessage) (*partitionConsumer, error) {
 	c := &partitionConsumer{
 		state:        consumerInit,
 		client:       client,
@@ -78,6 +79,7 @@ func newPartitionConsumer(client *client, topic string, options *ConsumerOptions
 		log:          log.WithField("topic", topic),
 		consumerID:   client.rpcClient.NewConsumerID(),
 		partitionIdx: partitionID,
+		partitionNum: partitionNum,
 		eventsChan:   make(chan interface{}),
 		subQueue:     make(chan ConsumerMessage, options.ReceiverQueueSize),
 	}
@@ -108,7 +110,7 @@ func newPartitionConsumer(client *client, topic string, options *ConsumerOptions
 	if options.Type == Shared || options.Type == KeyShared {
 		if options.AckTimeout != 0 {
 			c.unAckTracker = NewUnackedMessageTracker()
-			c.unAckTracker.pc = c
+			c.unAckTracker.pcs = append(c.unAckTracker.pcs, c)
 			c.unAckTracker.Start(int64(options.AckTimeout))
 		}
 	}
@@ -128,6 +130,18 @@ func newPartitionConsumer(client *client, topic string, options *ConsumerOptions
 	c.log = c.log.WithField("name", c.consumerName)
 	c.log.Info("Created consumer")
 	c.state = consumerReady
+
+	// In here, open a gorutine to receive data asynchronously from the subConsumer,
+	// filling the queue channel of the current consumer.
+	if partitionNum > 1 {
+		go func() {
+			err = c.ReceiveAsync(context.Background(), ch)
+			if err != nil {
+				return
+			}
+		}()
+	}
+
 	go c.runEventsLoop()
 
 	return c, nil
@@ -238,35 +252,60 @@ func (pc *partitionConsumer) internalUnsubscribe(unsub *handleUnsubscribe) {
 	unsub.waitGroup.Done()
 }
 
-func (pc *partitionConsumer) Receive(ctx context.Context) (Message, error) {
-	select {
-	case <-ctx.Done():
-		return nil, ctx.Err()
-	case cm, ok := <-pc.subQueue:
-		if ok {
-			id := &pb.MessageIdData{}
-			err := proto.Unmarshal(cm.ID().Serialize(), id)
-			if err != nil {
-				pc.log.WithError(err).Errorf("unserialize message id error:%s", err.Error())
-				return nil, err
-			}
-			if pc.unAckTracker != nil {
-				pc.unAckTracker.Add(id)
-			}
-			return cm.Message, nil
-		}
-		return nil, newError(ResultConnectError, "receive queue closed")
+func (pc *partitionConsumer) trackMessage(msgID MessageID) error {
+	id := &pb.MessageIdData{}
+	err := proto.Unmarshal(msgID.Serialize(), id)
+	if err != nil {
+		pc.log.WithError(err).Errorf("unserialize message id error:%s", err.Error())
+		return err
+	}
+	if pc.unAckTracker != nil {
+		pc.unAckTracker.Add(id)
 	}
+	return nil
 }
 
-func (pc *partitionConsumer) ReceiveAsync(ctx context.Context, msgs chan<- ConsumerMessage) error {
+func (pc *partitionConsumer) increaseAvailablePermits(receivedSinceFlow uint32) error {
 	highwater := uint32(math.Max(float64(cap(pc.options.MessageChannel)/2), 1))
+	if receivedSinceFlow >= highwater {
+		if err := pc.internalFlow(receivedSinceFlow); err != nil {
+			pc.log.Errorf("Send Flow cmd error:%s", err.Error())
+			return err
+		}
+		receivedSinceFlow = 0
+	}
+	return nil
+}
+
+func (pc *partitionConsumer) messageProcessed(msgID MessageID, receivedSinceFlow uint32) error {
+	err := pc.trackMessage(msgID)
+	if err != nil {
+		return err
+	}
+	receivedSinceFlow++
 
-	// request half the buffer's capacity
-	if err := pc.internalFlow(highwater); err != nil {
-		pc.log.Errorf("Send Flow cmd error:%s", err.Error())
+	err = pc.increaseAvailablePermits(receivedSinceFlow)
+	if err != nil {
 		return err
 	}
+
+	return nil
+}
+
+func (pc *partitionConsumer) Receive(ctx context.Context) (message Message, err error) {
+	wg := &sync.WaitGroup{}
+	wg.Add(1)
+	pc.ReceiveAsyncWithCallback(ctx, func(msg Message, e error) {
+		message = msg
+		err = e
+		wg.Done()
+	})
+	wg.Wait()
+
+	return message, err
+}
+
+func (pc *partitionConsumer) ReceiveAsync(ctx context.Context, msgs chan<- ConsumerMessage) error {
 	var receivedSinceFlow uint32
 
 	for {
@@ -274,30 +313,38 @@ func (pc *partitionConsumer) ReceiveAsync(ctx context.Context, msgs chan<- Consu
 		case tmpMsg, ok := <-pc.subQueue:
 			if ok {
 				msgs <- tmpMsg
-				id := &pb.MessageIdData{}
-				err := proto.Unmarshal(tmpMsg.ID().Serialize(), id)
+
+				err := pc.messageProcessed(tmpMsg.ID(), receivedSinceFlow)
 				if err != nil {
-					pc.log.WithError(err).Errorf("unserialize message id error:%s", err.Error())
 					return err
 				}
-				if pc.unAckTracker != nil {
-					pc.unAckTracker.Add(id)
-				}
-				receivedSinceFlow++
-				if receivedSinceFlow >= highwater {
-					if err := pc.internalFlow(receivedSinceFlow); err != nil {
-						pc.log.Errorf("Send Flow cmd error:%s", err.Error())
-						return err
-					}
-					receivedSinceFlow = 0
-				}
 				continue
 			}
+			break
 		case <-ctx.Done():
 			return ctx.Err()
 		}
 	}
+}
 
+func (pc *partitionConsumer) ReceiveAsyncWithCallback(ctx context.Context, callback func(msg Message, err error)) {
+	var receivedSinceFlow uint32
+	var err error
+
+	select {
+	case tmpMsg, ok := <-pc.subQueue:
+		if ok {
+			err = pc.messageProcessed(tmpMsg.ID(), receivedSinceFlow)
+			callback(tmpMsg.Message, err)
+			if err != nil {
+				pc.log.Errorf("processed messages error:%s", err.Error())
+				return
+			}
+		}
+	case <-ctx.Done():
+		pc.log.Errorf("context shouldn't done, please check error:%s", ctx.Err().Error())
+		return
+	}
 }
 
 func (pc *partitionConsumer) Ack(msg Message) error {
@@ -465,23 +512,23 @@ func (pc *partitionConsumer) internalRedeliver(redeliver *handleRedeliver) {
 	pc.omu.Lock()
 	defer pc.omu.Unlock()
 
-	overFlowSize := len(pc.overflow)
+	redeliverMessagesSize := len(pc.redeliverMessages)
 
-	if overFlowSize == 0 {
+	if redeliverMessagesSize == 0 {
 		return
 	}
 
 	requestID := pc.client.rpcClient.NewRequestID()
 
-	for i := 0; i < len(pc.overflow); i += maxRedeliverUnacknowledged {
+	for i := 0; i < len(pc.redeliverMessages); i += maxRedeliverUnacknowledged {
 		end := i + maxRedeliverUnacknowledged
-		if end > overFlowSize {
-			end = overFlowSize
+		if end > redeliverMessagesSize {
+			end = redeliverMessagesSize
 		}
 		_, err := pc.client.rpcClient.RequestOnCnxNoWait(pc.cnx, requestID,
 			pb.BaseCommand_REDELIVER_UNACKNOWLEDGED_MESSAGES, &pb.CommandRedeliverUnacknowledgedMessages{
 				ConsumerId: proto.Uint64(pc.consumerID),
-				MessageIds: pc.overflow[i:end],
+				MessageIds: pc.redeliverMessages[i:end],
 			})
 		if err != nil {
 			pc.log.WithError(err).Error("Failed to unsubscribe consumer")
@@ -489,8 +536,8 @@ func (pc *partitionConsumer) internalRedeliver(redeliver *handleRedeliver) {
 		}
 	}
 
-	// clear Overflow slice
-	pc.overflow = nil
+	// clear redeliverMessages slice
+	pc.redeliverMessages = nil
 
 	if pc.unAckTracker != nil {
 		pc.unAckTracker.clear()
@@ -574,56 +621,58 @@ func (pc *partitionConsumer) internalFlow(permits uint32) error {
 	return nil
 }
 
-func (pc *partitionConsumer) HandlerMessage(response *pb.CommandMessage, headersAndPayload []byte) error {
+func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, headersAndPayload []byte) error {
 	msgID := response.GetMessageId()
 
 	id := newMessageID(int64(msgID.GetLedgerId()), int64(msgID.GetEntryId()),
 		int(msgID.GetBatchIndex()), pc.partitionIdx)
 
-	msgMeta, payload, err := internal.ParseMessage(headersAndPayload)
+	msgMeta, payloadList, err := internal.ParseMessage(headersAndPayload)
 	if err != nil {
 		return fmt.Errorf("parse message error:%s", err)
 	}
 
-	//numMsgs := msgMeta.GetNumMessagesInBatch()
-
-	msg := &message{
-		publishTime: timeFromUnixTimestampMillis(msgMeta.GetPublishTime()),
-		eventTime:   timeFromUnixTimestampMillis(msgMeta.GetEventTime()),
-		key:         msgMeta.GetPartitionKey(),
-		properties:  internal.ConvertToStringMap(msgMeta.GetProperties()),
-		topic:       pc.topic,
-		msgID:       id,
-		payLoad:     payload,
-	}
+	for _, payload := range payloadList {
+		msg := &message{
+			publishTime: timeFromUnixTimestampMillis(msgMeta.GetPublishTime()),
+			eventTime:   timeFromUnixTimestampMillis(msgMeta.GetEventTime()),
+			key:         msgMeta.GetPartitionKey(),
+			properties:  internal.ConvertToStringMap(msgMeta.GetProperties()),
+			topic:       pc.topic,
+			msgID:       id,
+			payLoad:     payload,
+		}
 
-	consumerMsg := ConsumerMessage{
-		Message:  msg,
-		Consumer: pc,
-	}
+		consumerMsg := ConsumerMessage{
+			Message:  msg,
+			Consumer: pc,
+		}
 
-	select {
-	case pc.subQueue <- consumerMsg:
-		// Add messageId to Overflow buffer, avoiding duplicates.
-		newMid := response.GetMessageId()
-		var dup bool
-
-		pc.omu.Lock()
-		for _, mid := range pc.overflow {
-			if proto.Equal(mid, newMid) {
-				dup = true
-				break
+		select {
+		case pc.subQueue <- consumerMsg:
+			//Add messageId to redeliverMessages buffer, avoiding duplicates.
+			newMid := response.GetMessageId()
+			var dup bool
+
+			pc.omu.Lock()
+			for _, mid := range pc.redeliverMessages {
+				if proto.Equal(mid, newMid) {
+					dup = true
+					break
+				}
 			}
-		}
 
-		if !dup {
-			pc.overflow = append(pc.overflow, newMid)
+			if !dup {
+				pc.redeliverMessages = append(pc.redeliverMessages, newMid)
+			}
+			pc.omu.Unlock()
+			continue
+		default:
+			return fmt.Errorf("consumer message channel on topic %s is full (capacity = %d)", pc.Topic(), cap(pc.options.MessageChannel))
 		}
-		pc.omu.Unlock()
-		return nil
-	default:
-		return fmt.Errorf("consumer message channel on topic %s is full (capacity = %d)", pc.Topic(), cap(pc.options.MessageChannel))
 	}
+
+	return nil
 }
 
 type handleAck struct {
diff --git a/pulsar/impl_partition_producer.go b/pulsar/impl_partition_producer.go
index 98e2156..e00c1e1 100644
--- a/pulsar/impl_partition_producer.go
+++ b/pulsar/impl_partition_producer.go
@@ -34,7 +34,7 @@ import (
 type producerState int
 
 const (
-	producerInit = iota
+	producerInit producerState = iota
 	producerReady
 	producerClosing
 	producerClosed
@@ -249,7 +249,8 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
 	sequenceID := internal.GetAndAdd(p.sequenceIDGenerator, 1)
 
 	if sendAsBatch {
-		for p.batchBuilder.Add(smm, sequenceID, msg.Payload, request, msg.ReplicationClusters) == false {
+		ok := p.batchBuilder.Add(smm, sequenceID, msg.Payload, request, msg.ReplicationClusters)
+		if ok == false {
 			// The current batch is full.. flush it and retry
 			p.internalFlushCurrentBatch()
 		}
@@ -321,13 +322,25 @@ func (p *partitionProducer) Send(ctx context.Context, msg *ProducerMessage) erro
 func (p *partitionProducer) SendAsync(ctx context.Context, msg *ProducerMessage,
 	callback func(MessageID, *ProducerMessage, error)) {
 	p.publishSemaphore.Acquire()
-	p.eventsChan <- &sendRequest{ctx, msg, callback, false}
+	sr := &sendRequest{
+		ctx:              ctx,
+		msg:              msg,
+		callback:         callback,
+		flushImmediately: false,
+	}
+	p.eventsChan <- sr
 }
 
 func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *ProducerMessage,
 	callback func(MessageID, *ProducerMessage, error), flushImmediately bool) {
 	p.publishSemaphore.Acquire()
-	p.eventsChan <- &sendRequest{ctx, msg, callback, flushImmediately}
+	sr := &sendRequest{
+		ctx:              ctx,
+		msg:              msg,
+		callback:         callback,
+		flushImmediately: flushImmediately,
+	}
+	p.eventsChan <- sr
 }
 
 func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) {
diff --git a/pulsar/internal/commands.go b/pulsar/internal/commands.go
index 04545c1..1f29f7f 100644
--- a/pulsar/internal/commands.go
+++ b/pulsar/internal/commands.go
@@ -87,7 +87,7 @@ func addSingleMessageToBatch(wb Buffer, smm *pb.SingleMessageMetadata, payload [
 	wb.Write(payload)
 }
 
-func ParseMessage(headersAndPayload []byte) (msgMeta *pb.MessageMetadata, payload []byte, err error) {
+func ParseMessage(headersAndPayload []byte) (msgMeta *pb.MessageMetadata, payloadList [][]byte, err error) {
 	// reusable buffer for 4-byte uint32s
 	buf32 := make([]byte, 4)
 	r := bytes.NewReader(headersAndPayload)
@@ -164,33 +164,63 @@ func ParseMessage(headersAndPayload []byte) (msgMeta *pb.MessageMetadata, payloa
 		return nil, nil, err
 	}
 
+	numMsg := msgMeta.GetNumMessagesInBatch()
+
+	if numMsg > 0 && msgMeta.NumMessagesInBatch != nil {
+		payloads := make([]byte, lr.N)
+		if _, err = io.ReadFull(lr, payloads); err != nil {
+			return nil, nil, err
+		}
+
+		singleMessages, err := decodeBatchPayload(payloads, numMsg)
+		if err != nil {
+			return nil, nil, err
+		}
+
+		payloadList = make([][]byte, 0, numMsg)
+		for _, singleMsg := range singleMessages {
+			msgMeta.PartitionKey = singleMsg.SingleMeta.PartitionKey
+			msgMeta.Properties = singleMsg.SingleMeta.Properties
+			msgMeta.EventTime = singleMsg.SingleMeta.EventTime
+			payloadList = append(payloadList, singleMsg.SinglePayload)
+		}
+
+		if err := computeChecksum(chksum, expectedChksum); err != nil {
+			return nil, nil, err
+		}
+		return msgMeta, payloadList, nil
+	}
 	// Anything left in the frame is considered
 	// the payload and can be any sequence of bytes.
-	payloads := make([]byte, lr.N)
-	if _, err = io.ReadFull(lr, payloads); err != nil {
-		return nil, nil, err
-	}
+	payloadList = make([][]byte, 0, 10)
+	if lr.N > 0 {
+		// guard against allocating large buffer
+		if lr.N > MaxFrameSize {
+			return nil, nil, fmt.Errorf("frame payload size (%d) cannot be greater than max frame size (%d)", lr.N, MaxFrameSize)
+		}
 
-	numMsg := msgMeta.GetNumMessagesInBatch()
+		payload := make([]byte, lr.N)
+		if _, err = io.ReadFull(lr, payload); err != nil {
+			return nil, nil, err
+		}
 
-	singleMessages, err := decodeBatchPayload(payloads, numMsg)
-	if err != nil {
-		return nil, nil, err
+		payloadList = append(payloadList, payload)
 	}
 
-	for _, singleMsg := range singleMessages {
-		payload = singleMsg.SinglePayload
-		msgMeta.PartitionKey = singleMsg.SingleMeta.PartitionKey
-		msgMeta.Properties = singleMsg.SingleMeta.Properties
-		msgMeta.EventTime = singleMsg.SingleMeta.EventTime
+	if err := computeChecksum(chksum, expectedChksum); err != nil {
+		return nil, nil, err
 	}
 
-	if computed := chksum.compute(); !bytes.Equal(computed, expectedChksum) {
-		return nil, nil, fmt.Errorf("checksum mismatch: computed (0x%X) does "+
+	return msgMeta, payloadList, nil
+}
+
+func computeChecksum(chksum CheckSum, expectedChksum []byte) error {
+	computed := chksum.compute()
+	if !bytes.Equal(computed, expectedChksum) {
+		return fmt.Errorf("checksum mismatch: computed (0x%X) does "+
 			"not match given checksum (0x%X)", computed, expectedChksum)
 	}
-
-	return msgMeta, payload, nil
+	return nil
 }
 
 func serializeBatch(wb Buffer, cmdSend *pb.BaseCommand, msgMetadata *pb.MessageMetadata, payload []byte) {
@@ -252,7 +282,7 @@ type singleMessage struct {
 func decodeBatchPayload(bp []byte, batchNum int32) ([]*singleMessage, error) {
 	buf32 := make([]byte, 4)
 	rdBuf := bytes.NewReader(bp)
-	list := make([]*singleMessage, 0, batchNum)
+	singleMsgList := make([]*singleMessage, 0, batchNum)
 	for i := int32(0); i < batchNum; i++ {
 		// singleMetaSize
 		if _, err := io.ReadFull(rdBuf, buf32); err != nil {
@@ -274,13 +304,15 @@ func decodeBatchPayload(bp []byte, batchNum int32) ([]*singleMessage, error) {
 		if _, err := io.ReadFull(rdBuf, singlePayload); err != nil {
 			return nil, err
 		}
-		d := &singleMessage{}
-		d.SingleMetaSize = singleMetaSize
-		d.SingleMeta = singleMeta
-		d.SinglePayload = singlePayload
-		list = append(list, d)
+		singleMsg := &singleMessage{
+			SingleMetaSize: singleMetaSize,
+			SingleMeta:     singleMeta,
+			SinglePayload:  singlePayload,
+		}
+
+		singleMsgList = append(singleMsgList, singleMsg)
 	}
-	return list, nil
+	return singleMsgList, nil
 }
 
 // ConvertFromStringMap convert a string map to a KeyValue []byte
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index d83f30b..2c707ea 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -64,7 +64,7 @@ type Connection interface {
 }
 
 type ConsumerHandler interface {
-    HandlerMessage(response *pb.CommandMessage, headersAndPayload []byte) error
+	MessageReceived(response *pb.CommandMessage, headersAndPayload []byte) error
 }
 
 type connectionState int
@@ -131,7 +131,7 @@ func newConnection(logicalAddr *url.URL, physicalAddr *url.URL, tlsOptions *TLSO
 		incomingRequests: make(chan *request),
 		writeRequests:    make(chan []byte),
 		listeners:        make(map[uint64]ConnectionListener),
-        connWrapper:      NewConnWrapper(),
+		connWrapper:      NewConnWrapper(),
 	}
 	cnx.reader = newConnectionReader(cnx)
 	cnx.cond = sync.NewCond(cnx)
@@ -307,7 +307,7 @@ func (c *connection) writeCommand(cmd proto.Message) {
 func (c *connection) receivedCommand(cmd *pb.BaseCommand, headersAndPayload []byte) {
 	c.log.Debugf("Received command: %s -- payload: %v", cmd, headersAndPayload)
 	c.lastDataReceivedTime = time.Now()
-    var err error
+	var err error
 
 	switch *cmd.Type {
 	case pb.BaseCommand_SUCCESS:
@@ -344,7 +344,7 @@ func (c *connection) receivedCommand(cmd *pb.BaseCommand, headersAndPayload []by
 	case pb.BaseCommand_SEND_ERROR:
 
 	case pb.BaseCommand_MESSAGE:
-        err = c.handleMessage(cmd.GetMessage(), headersAndPayload)
+		err = c.handleMessage(cmd.GetMessage(), headersAndPayload)
 	case pb.BaseCommand_PING:
 		c.handlePing()
 	case pb.BaseCommand_PONG:
@@ -353,9 +353,9 @@ func (c *connection) receivedCommand(cmd *pb.BaseCommand, headersAndPayload []by
 	case pb.BaseCommand_ACTIVE_CONSUMER_CHANGE:
 
 	default:
-        if err != nil {
-            c.log.Errorf("Received invalid command type: %s", cmd.Type)
-        }
+		if err != nil {
+			c.log.Errorf("Received invalid command type: %s", cmd.Type)
+		}
 		c.Close()
 	}
 }
@@ -403,18 +403,18 @@ func (c *connection) handleSendReceipt(response *pb.CommandSendReceipt) {
 }
 
 func (c *connection) handleMessage(response *pb.CommandMessage, payload []byte) error {
-    c.log.Debug("Got Message: ", response)
-    consumerId := response.GetConsumerId()
-    if consumer, ok := c.connWrapper.Consumers[consumerId]; ok {
-        err := consumer.HandlerMessage(response, payload)
-        if err != nil {
-            c.log.WithField("consumerId", consumerId).Error("handle message err: ", response.MessageId)
-            return errors.New("handler not found")
-        }
-    } else {
-        c.log.WithField("consumerId", consumerId).Warn("Got unexpected message: ", response.MessageId)
-    }
-    return nil
+	c.log.Debug("Got Message: ", response)
+	consumerId := response.GetConsumerId()
+	if consumer, ok := c.connWrapper.Consumers[consumerId]; ok {
+		err := consumer.MessageReceived(response, payload)
+		if err != nil {
+			c.log.WithField("consumerId", consumerId).Error("handle message err: ", response.MessageId)
+			return errors.New("handler not found")
+		}
+	} else {
+		c.log.WithField("consumerId", consumerId).Warn("Got unexpected message: ", response.MessageId)
+	}
+	return nil
 }
 
 func (c *connection) sendPing() {
@@ -522,8 +522,8 @@ func (c *connection) getTLSConfig() (*tls.Config, error) {
 }
 
 type ConnWrapper struct {
-	Rwmu             sync.RWMutex
-	Consumers        map[uint64]ConsumerHandler
+	Rwmu      sync.RWMutex
+	Consumers map[uint64]ConsumerHandler
 }
 
 func NewConnWrapper() *ConnWrapper {
@@ -533,13 +533,13 @@ func NewConnWrapper() *ConnWrapper {
 }
 
 func (c *connection) AddConsumeHandler(id uint64, handler ConsumerHandler) {
-    c.connWrapper.Rwmu.Lock()
-    c.connWrapper.Consumers[id] = handler
-    c.connWrapper.Rwmu.Unlock()
+	c.connWrapper.Rwmu.Lock()
+	c.connWrapper.Consumers[id] = handler
+	c.connWrapper.Rwmu.Unlock()
 }
 
 func (c *connection) DeleteConsumeHandler(id uint64) {
-    c.connWrapper.Rwmu.Lock()
-    delete(c.connWrapper.Consumers, id)
-    c.connWrapper.Rwmu.Unlock()
+	c.connWrapper.Rwmu.Lock()
+	delete(c.connWrapper.Consumers, id)
+	c.connWrapper.Rwmu.Unlock()
 }
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 66683d0..e9327d0 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -20,6 +20,7 @@ package pulsar
 import (
 	"context"
 	"fmt"
+	"net/http"
 	"sync"
 	"testing"
 	"time"
@@ -30,6 +31,55 @@ import (
 	log "github.com/sirupsen/logrus"
 )
 
+func TestInvalidURL(t *testing.T) {
+	client, err := NewClient(ClientOptions{})
+
+	if client != nil || err == nil {
+		t.Fatal("Should have failed to create client")
+	}
+}
+
+func TestProducerConnectError(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: "pulsar://invalid-hostname:6650",
+	})
+
+	assert.Nil(t, err)
+
+	defer client.Close()
+
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic: newTopicName(),
+	})
+
+	// Expect error in creating producer
+	assert.Nil(t, producer)
+	assert.NotNil(t, err)
+
+	assert.Equal(t, err.Error(), "connection error")
+}
+
+func TestProducerNoTopic(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: "pulsar://localhost:6650",
+	})
+
+	if err != nil {
+		t.Fatal(err)
+		return
+	}
+
+	defer client.Close()
+
+	producer, err := client.CreateProducer(ProducerOptions{})
+
+	// Expect error in creating producer
+	assert.Nil(t, producer)
+	assert.NotNil(t, err)
+
+	assert.Equal(t, err.(*Error).Result(), ResultInvalidTopicName)
+}
+
 func TestSimpleProducer(t *testing.T) {
 	client, err := NewClient(ClientOptions{
 		URL: serviceURL,
@@ -92,7 +142,8 @@ func TestProducerAsyncSend(t *testing.T) {
 		assert.NoError(t, err)
 	}
 
-	producer.Flush()
+	err = producer.Flush()
+	assert.Nil(t, err)
 
 	wg.Wait()
 
@@ -181,6 +232,213 @@ func TestProducerLastSequenceID(t *testing.T) {
 	assert.NoError(t, err)
 }
 
+func TestEventTime(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: serviceURL,
+	})
+	assert.NoError(t, err)
+	defer client.Close()
+
+	topicName := "test-event-time"
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic: topicName,
+	})
+	assert.Nil(t, err)
+	defer producer.Close()
+
+	consumer, err := client.Subscribe(ConsumerOptions{
+		Topic:            topicName,
+		SubscriptionName: "subName",
+	})
+	assert.Nil(t, err)
+	defer consumer.Close()
+
+	eventTime := timeFromUnixTimestampMillis(uint64(1565161612))
+	err = producer.Send(context.Background(), &ProducerMessage{
+		Payload:   []byte(fmt.Sprintf("test-event-time")),
+		EventTime: &eventTime,
+	})
+	assert.Nil(t, err)
+
+	msg, err := consumer.Receive(context.Background())
+	assert.Nil(t, err)
+	actualEventTime := msg.EventTime()
+	assert.Equal(t, eventTime.Unix(), actualEventTime.Unix())
+}
+
+func TestFlushInProducer(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: serviceURL,
+	})
+	assert.NoError(t, err)
+	defer client.Close()
+
+	topicName := "test-flush-in-producer"
+	subName := "subscription-name"
+	numOfMessages := 10
+	ctx := context.Background()
+
+	// set batch message number numOfMessages, and max delay 10s
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic:                   topicName,
+		DisableBatching:         false,
+		BatchingMaxMessages:     uint(numOfMessages),
+		BatchingMaxPublishDelay: time.Second * 10,
+		BlockIfQueueFull:        true,
+		Properties: map[string]string{
+			"producer-name": "test-producer-name",
+			"producer-id":   "test-producer-id",
+		},
+	})
+	defer producer.Close()
+
+	consumer, err := client.Subscribe(ConsumerOptions{
+		Topic:            topicName,
+		SubscriptionName: subName,
+	})
+	assert.Nil(t, err)
+	defer consumer.Close()
+
+	prefix := "msg-batch-async"
+	msgCount := 0
+
+	wg := sync.WaitGroup{}
+	wg.Add(5)
+	errors := util.NewBlockingQueue(10)
+	for i := 0; i < numOfMessages/2; i++ {
+		messageContent := prefix + fmt.Sprintf("%d", i)
+		producer.SendAsync(ctx, &ProducerMessage{
+			Payload: []byte(messageContent),
+		}, func(id MessageID, producerMessage *ProducerMessage, e error) {
+			if e != nil {
+				log.WithError(e).Error("Failed to publish")
+				errors.Put(e)
+			} else {
+				log.Info("Published message ", id)
+			}
+			wg.Done()
+		})
+		assert.Nil(t, err)
+	}
+	err = producer.Flush()
+	assert.Nil(t, err)
+	wg.Wait()
+
+	for i := 0; i < numOfMessages/2; i++ {
+		_, err := consumer.Receive(ctx)
+		assert.Nil(t, err)
+		msgCount++
+	}
+
+	assert.Equal(t, msgCount, numOfMessages/2)
+
+	wg.Add(5)
+	for i := numOfMessages / 2; i < numOfMessages; i++ {
+		messageContent := prefix + fmt.Sprintf("%d", i)
+		producer.SendAsync(ctx, &ProducerMessage{
+			Payload: []byte(messageContent),
+		}, func(id MessageID, producerMessage *ProducerMessage, e error) {
+			if e != nil {
+				log.WithError(e).Error("Failed to publish")
+				errors.Put(e)
+			} else {
+				log.Info("Published message ", id)
+			}
+			wg.Done()
+		})
+		assert.Nil(t, err)
+	}
+
+	err = producer.Flush()
+	assert.Nil(t, err)
+	wg.Wait()
+
+	for i := numOfMessages / 2; i < numOfMessages; i++ {
+		_, err := consumer.Receive(ctx)
+		assert.Nil(t, err)
+		msgCount++
+	}
+	assert.Equal(t, msgCount, numOfMessages)
+}
+
+func TestFlushInPartitionedProducer(t *testing.T) {
+	topicName := "persistent://public/default/partition-testFlushInPartitionedProducer"
+
+	// call admin api to make it partitioned
+	url := adminURL + "/" + "admin/v2/" + topicName + "/partitions"
+	makeHTTPCall(t, http.MethodPut, url, "5")
+
+	numberOfPartitions := 5
+	numOfMessages := 10
+	ctx := context.Background()
+
+	// creat client connection
+	client, err := NewClient(ClientOptions{
+		URL: serviceURL,
+	})
+	assert.NoError(t, err)
+	defer client.Close()
+
+	// create consumer
+	consumer, err := client.Subscribe(ConsumerOptions{
+		Topic:            topicName,
+		SubscriptionName: "my-sub",
+		Type:             Exclusive,
+	})
+	assert.Nil(t, err)
+	defer consumer.Close()
+
+	// create producer and set batch message number numOfMessages, and max delay 10s
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic:                   topicName,
+		DisableBatching:         false,
+		BatchingMaxMessages:     uint(numOfMessages / numberOfPartitions),
+		BatchingMaxPublishDelay: time.Second * 10,
+		BlockIfQueueFull:        true,
+	})
+	defer producer.Close()
+
+	// send 5 messages
+	prefix := "msg-batch-async-"
+	wg := sync.WaitGroup{}
+	wg.Add(5)
+	errors := util.NewBlockingQueue(5)
+	for i := 0; i < numOfMessages/2; i++ {
+		messageContent := prefix + fmt.Sprintf("%d", i)
+		producer.SendAsync(ctx, &ProducerMessage{
+			Payload: []byte(messageContent),
+		}, func(id MessageID, producerMessage *ProducerMessage, e error) {
+			if e != nil {
+				log.WithError(e).Error("Failed to publish")
+				errors.Put(e)
+			} else {
+				log.Info("Published message: ", id)
+			}
+			wg.Done()
+		})
+		assert.Nil(t, err)
+	}
+
+	// After flush, should be able to consume.
+	err = producer.Flush()
+	assert.Nil(t, err)
+
+	wg.Wait()
+
+	// Receive all messages
+	msgCount := 0
+	for i := 0; i < numOfMessages/2; i++ {
+		msg, err := consumer.Receive(ctx)
+		fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
+			msg.ID(), string(msg.Payload()))
+		assert.Nil(t, err)
+		err = consumer.Ack(msg)
+		assert.Nil(t, err)
+		msgCount++
+	}
+	assert.Equal(t, msgCount, numOfMessages/2)
+}
+
 func TestMessageRouter(t *testing.T) {
 	// Create topic with 5 partitions
 	httpPut("http://localhost:8080/admin/v2/persistent/public/default/my-partitioned-topic/partitions", 5)
diff --git a/pulsar/unackedMsgTracker.go b/pulsar/unackedMsgTracker.go
index 8ec51c6..c46b731 100644
--- a/pulsar/unackedMsgTracker.go
+++ b/pulsar/unackedMsgTracker.go
@@ -34,7 +34,6 @@ type UnackedMessageTracker struct {
 	oldOpenSet set.Set
 	timeout    *time.Ticker
 
-	pc  *partitionConsumer
 	pcs []*partitionConsumer
 }
 
@@ -159,22 +158,7 @@ func (t *UnackedMessageTracker) handlerCmd(ackTimeoutMillis int64) {
 
 				t.oldOpenSet.Clear()
 
-				if t.pc != nil {
-					requestID := t.pc.client.rpcClient.NewRequestID()
-					cmd := &pb.CommandRedeliverUnacknowledgedMessages{
-						ConsumerId: proto.Uint64(t.pc.consumerID),
-						MessageIds: messageIds,
-					}
-
-					_, err := t.pc.client.rpcClient.RequestOnCnx(t.pc.cnx, requestID,
-						pb.BaseCommand_REDELIVER_UNACKNOWLEDGED_MESSAGES, cmd)
-					if err != nil {
-						t.pc.log.WithError(err).Error("Failed to unsubscribe consumer")
-						return
-					}
-
-					log.Debugf("consumer:%v redeliver messages num:%d", t.pc.consumerName, len(messageIds))
-				} else if t.pcs != nil {
+				if t.pcs != nil {
 					messageIdsMap := make(map[int32][]*pb.MessageIdData)
 					for _, msgID := range messageIds {
 						messageIdsMap[msgID.GetPartition()] = append(messageIdsMap[msgID.GetPartition()], msgID)
@@ -198,7 +182,7 @@ func (t *UnackedMessageTracker) handlerCmd(ackTimeoutMillis int64) {
 					}
 				}
 			}
-			log.Debug("Tick at ", tick)
+			log.Debugf("Tick at: %v", tick)
 		}
 
 		t.toggle()
diff --git a/util/util.go b/util/util.go
index 06a7e53..bd4f5d6 100644
--- a/util/util.go
+++ b/util/util.go
@@ -18,15 +18,27 @@
 package util
 
 import (
-    `reflect`
+	"reflect"
 )
 
 // IsNil check if the interface is nil
 func IsNil(i interface{}) bool {
-    vi := reflect.ValueOf(i)
-    if vi.Kind() == reflect.Ptr {
-        return vi.IsNil()
-    }
-    return false
+	vi := reflect.ValueOf(i)
+	if vi.Kind() == reflect.Ptr {
+		return vi.IsNil()
+	}
+	return false
 }
 
+// RemoveDuplicateElement remove repeating elements from the string slice
+func RemoveDuplicateElement(addrs []string) []string {
+	result := make([]string, 0, len(addrs))
+	temp := map[string]struct{}{}
+	for _, item := range addrs {
+		if _, ok := temp[item]; !ok {
+			temp[item] = struct{}{}
+			result = append(result, item)
+		}
+	}
+	return result
+}
diff --git a/util/util_test.go b/util/util_test.go
index 2e1195c..284dd0c 100644
--- a/util/util_test.go
+++ b/util/util_test.go
@@ -18,14 +18,23 @@
 package util
 
 import (
-    `github.com/stretchr/testify/assert`
-    `testing`
+	"fmt"
+	"github.com/stretchr/testify/assert"
+	"strings"
+	"testing"
 )
 
 func TestIsNil(t *testing.T) {
-    var a interface{} = nil
-    var b interface{} = (*int)(nil)
+	var a interface{} = nil
+	var b interface{} = (*int)(nil)
 
-    assert.True(t, a == nil)
-    assert.False(t, b == nil)
+	assert.True(t, a == nil)
+	assert.False(t, b == nil)
+}
+
+func TestRemoveDuplicateElement(t *testing.T) {
+	s := []string{"hello", "world", "hello", "golang", "hello", "ruby", "php", "java"}
+	resList := RemoveDuplicateElement(s)
+	res := fmt.Sprintf("%s", resList[:])
+	assert.Equal(t, 1, strings.Count(res, "hello"))
 }