You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2019/08/14 03:00:06 UTC
[pulsar-client-go] branch master updated: Support partition
consumer receive async and fix batch logic (#43)
This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 448387d Support partition consumer receive async and fix batch logic (#43)
448387d is described below
commit 448387d738a2f3af4c8232daa4fac9576d252617
Author: 冉小龙 <ra...@gmail.com>
AuthorDate: Wed Aug 14 11:00:00 2019 +0800
Support partition consumer receive async and fix batch logic (#43)
Signed-off-by: xiaolong.ran ranxiaolong716@gmail.com
* Support batch logic for project
* add unit test case of event time
* add some unit tests case for producer
* fix error result type
* add unit test case of producer flush
* add receiver queue size test logic
* support partition consumer receive async
* add unit test case of ack timeout
* Fix consumer receiving message out of order
---
pulsar/consumer.go | 3 +
pulsar/consumer_test.go | 301 +++++++++++++++++++++++++++++++++++++-
pulsar/error.go | 2 +-
pulsar/impl_consumer.go | 55 +++++--
pulsar/impl_partition_consumer.go | 217 ++++++++++++++++-----------
pulsar/impl_partition_producer.go | 21 ++-
pulsar/internal/commands.go | 82 +++++++----
pulsar/internal/connection.go | 54 +++----
pulsar/producer_test.go | 260 +++++++++++++++++++++++++++++++-
pulsar/unackedMsgTracker.go | 20 +--
util/util.go | 24 ++-
util/util_test.go | 21 ++-
12 files changed, 870 insertions(+), 190 deletions(-)
diff --git a/pulsar/consumer.go b/pulsar/consumer.go
index de190e0..c259cd6 100644
--- a/pulsar/consumer.go
+++ b/pulsar/consumer.go
@@ -142,6 +142,9 @@ type Consumer interface {
// ReceiveAsync appends the message to the msgs channel asynchronously.
ReceiveAsync(ctx context.Context, msgs chan<- ConsumerMessage) error
+ // ReceiveAsyncWithCallback returns a callback containing the message and error objects
+ ReceiveAsyncWithCallback(ctx context.Context, callback func(msg Message, err error))
+
// Ack the consumption of a single message
Ack(Message) error
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 39646d3..6fe86cd 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -1,4 +1,3 @@
-//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
@@ -15,7 +14,6 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-//
package pulsar
@@ -124,6 +122,67 @@ func TestConsumerConnectError(t *testing.T) {
assert.Equal(t, err.Error(), "connection error")
}
+func TestBatchMessageReceive(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+
+ assert.Nil(t, err)
+ defer client.Close()
+
+ topicName := "persistent://public/default/receive-batch"
+ subName := "subscription-name"
+ prefix := "msg-batch-"
+ ctx := context.Background()
+
+ // Enable batching on producer side
+ batchSize, numOfMessages := 2, 100
+
+ // create producer
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topicName,
+ BatchingMaxMessages: uint(batchSize),
+ DisableBatching: false,
+ BlockIfQueueFull: true,
+ })
+ assert.Nil(t, err)
+ assert.Equal(t, topicName, producer.Topic())
+ defer producer.Close()
+
+ consumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topicName,
+ SubscriptionName: subName,
+ })
+ assert.Equal(t, topicName, consumer.Topic())
+ count := 0
+
+ for i := 0; i < numOfMessages; i++ {
+ messageContent := prefix + fmt.Sprintf("%d", i)
+ msg := &ProducerMessage{
+ Payload: []byte(messageContent),
+ }
+ err := producer.Send(ctx, msg)
+ assert.Nil(t, err)
+ }
+
+ for i := 0; i < numOfMessages; i++ {
+ msg, err := consumer.Receive(ctx)
+ assert.Nil(t, err)
+ err = consumer.Ack(msg)
+ assert.Nil(t, err)
+ count++
+ }
+
+ // check strategically
+ for i := 0; i < 3; i++ {
+ if count == numOfMessages {
+ break
+ }
+ time.Sleep(time.Second)
+ }
+ assert.Equal(t, count, numOfMessages)
+}
+
func TestConsumerWithInvalidConf(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
@@ -263,7 +322,7 @@ func TestConsumerKeyShared(t *testing.T) {
assert.Nil(t, err)
}
- time.Sleep(time.Second * 5)
+ time.Sleep(time.Second * 1)
go func() {
for i := 0; i < 10; i++ {
@@ -288,6 +347,8 @@ func TestConsumerKeyShared(t *testing.T) {
}
}
}()
+
+ time.Sleep(time.Second * 1)
}
func TestPartitionTopicsConsumerPubSub(t *testing.T) {
@@ -300,7 +361,7 @@ func TestPartitionTopicsConsumerPubSub(t *testing.T) {
topic := "persistent://public/default/testGetPartitions"
testURL := adminURL + "/" + "admin/v2/persistent/public/default/testGetPartitions/partitions"
- makeHTTPCall(t, http.MethodPut, testURL, "3")
+ makeHTTPCall(t, http.MethodPut, testURL, "5")
// create producer
producer, err := client.CreateProducer(ProducerOptions{
@@ -316,9 +377,10 @@ func TestPartitionTopicsConsumerPubSub(t *testing.T) {
assert.Equal(t, topic+"-partition-2", topics[2])
consumer, err := client.Subscribe(ConsumerOptions{
- Topic: topic,
- SubscriptionName: "my-sub",
- Type: Exclusive,
+ Topic: topic,
+ SubscriptionName: "my-sub",
+ Type: Exclusive,
+ ReceiverQueueSize: 10,
})
assert.Nil(t, err)
defer consumer.Close()
@@ -348,3 +410,228 @@ func TestPartitionTopicsConsumerPubSub(t *testing.T) {
assert.Equal(t, len(msgs), 10)
}
+
+func TestConsumer_ReceiveAsync(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+
+ assert.Nil(t, err)
+ defer client.Close()
+
+ topicName := "persistent://public/default/receive-async"
+ subName := "subscription-receive-async"
+ ctx := context.Background()
+ ch := make(chan ConsumerMessage, 10)
+
+ // create producer
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topicName,
+ })
+ defer producer.Close()
+
+ consumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topicName,
+ SubscriptionName: subName,
+ })
+ defer consumer.Close()
+
+ //send 10 messages
+ for i := 0; i < 10; i++ {
+ err := producer.Send(ctx, &ProducerMessage{
+ Payload: []byte(fmt.Sprintf("hello-%d", i)),
+ })
+ assert.Nil(t, err)
+ }
+
+ //receive async 10 messages
+ err = consumer.ReceiveAsync(ctx, ch)
+ assert.Nil(t, err)
+
+ payloadList := make([]string, 0, 10)
+
+RECEIVE:
+ for {
+ select {
+ case cMsg, ok := <-ch:
+ if ok {
+ fmt.Printf("receive message payload is:%s\n", string(cMsg.Payload()))
+ assert.Equal(t, topicName, cMsg.Message.Topic())
+ assert.Equal(t, topicName, cMsg.Consumer.Topic())
+ payloadList = append(payloadList, string(cMsg.Message.Payload()))
+ if len(payloadList) == 10 {
+ break RECEIVE
+ }
+ }
+ continue RECEIVE
+ case <-ctx.Done():
+ t.Error("context error.")
+ return
+ }
+ }
+}
+
+func TestConsumerAckTimeout(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+ assert.Nil(t, err)
+ defer client.Close()
+
+ topic := "test-ack-timeout-topic-1"
+ ctx := context.Background()
+
+ // create consumer
+ consumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: "my-sub1",
+ Type: Shared,
+ AckTimeout: 5 * 1000,
+ })
+ assert.Nil(t, err)
+ defer consumer.Close()
+
+ // create consumer1
+ consumer1, err := client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: "my-sub2",
+ Type: Shared,
+ AckTimeout: 5 * 1000,
+ })
+ assert.Nil(t, err)
+ defer consumer1.Close()
+
+ // create producer
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topic,
+ DisableBatching: true,
+ })
+ assert.Nil(t, err)
+ defer producer.Close()
+
+ // send 10 messages
+ for i := 0; i < 10; i++ {
+ if err := producer.Send(ctx, &ProducerMessage{
+ Payload: []byte(fmt.Sprintf("hello-%d", i)),
+ }); err != nil {
+ log.Fatal(err)
+ }
+ }
+
+ // consumer receive 10 messages
+ payloadList := make([]string, 0, 10)
+ for i := 0; i < 10; i++ {
+ msg, err := consumer.Receive(context.Background())
+ if err != nil {
+ log.Fatal(err)
+ }
+ payloadList = append(payloadList, string(msg.Payload()))
+
+ // not ack message
+ }
+ assert.Equal(t, 10, len(payloadList))
+
+ // consumer1 receive 10 messages
+ for i := 0; i < 10; i++ {
+ msg, err := consumer1.Receive(context.Background())
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ payloadList = append(payloadList, string(msg.Payload()))
+
+ // ack half of the messages
+ if i%2 == 0 {
+ err = consumer1.Ack(msg)
+ assert.Nil(t, err)
+ }
+ }
+
+ // wait ack timeout
+ time.Sleep(6 * time.Second)
+
+ fmt.Println("start redeliver messages...")
+
+ payloadList = make([]string, 0, 10)
+ // consumer receive messages again
+ for i := 0; i < 10; i++ {
+ msg, err := consumer.Receive(context.Background())
+ if err != nil {
+ log.Fatal(err)
+ }
+ payloadList = append(payloadList, string(msg.Payload()))
+
+ // ack message
+ if err := consumer.Ack(msg); err != nil {
+ log.Fatal(err)
+ }
+ }
+ assert.Equal(t, 10, len(payloadList))
+
+ payloadList = make([]string, 0, 5)
+ // consumer1 receive messages again
+ go func() {
+ for i := 0; i < 10; i++ {
+ msg, err := consumer1.Receive(context.Background())
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ expectMsg := fmt.Sprintf("hello-%d", i)
+ fmt.Printf("redeliver messages, payload is:%s\n", expectMsg)
+ payloadList = append(payloadList, string(msg.Payload()))
+
+ // ack message
+ if err := consumer1.Ack(msg); err != nil {
+ log.Fatal(err)
+ }
+ }
+ assert.Equal(t, 5, len(payloadList))
+ }()
+
+ // sleep 2 seconds, wait gorutine receive messages.
+ time.Sleep(time.Second * 2)
+}
+
+func TestConsumer_ReceiveAsyncWithCallback(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+
+ assert.Nil(t, err)
+ defer client.Close()
+
+ topicName := "persistent://public/default/receive-async-with-callback"
+ subName := "subscription-receive-async"
+ ctx := context.Background()
+
+ // create producer
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topicName,
+ })
+ defer producer.Close()
+
+ consumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topicName,
+ SubscriptionName: subName,
+ })
+ defer consumer.Close()
+
+ //send 10 messages
+ for i := 0; i < 10; i++ {
+ err := producer.Send(ctx, &ProducerMessage{
+ Payload: []byte(fmt.Sprintf("hello-%d", i)),
+ })
+ assert.Nil(t, err)
+ }
+
+ for i := 0; i < 10; i++ {
+ consumer.ReceiveAsyncWithCallback(ctx, func(msg Message, err error) {
+ if err != nil {
+ log.Fatal(err)
+ }
+ fmt.Printf("receive message payload is:%s\n", string(msg.Payload()))
+ assert.Equal(t, fmt.Sprintf("hello-%d", i), string(msg.Payload()))
+ })
+ }
+}
diff --git a/pulsar/error.go b/pulsar/error.go
index 0231913..ec20844 100644
--- a/pulsar/error.go
+++ b/pulsar/error.go
@@ -24,7 +24,7 @@ type Result int
const (
// ResultOk means no errors
- ResultOk = iota
+ ResultOk Result = iota
// ResultUnknownError means unknown error happened on broker
ResultUnknownError
// ResultInvalidConfiguration means invalid configuration
diff --git a/pulsar/impl_consumer.go b/pulsar/impl_consumer.go
index 0a44971..13e72ae 100644
--- a/pulsar/impl_consumer.go
+++ b/pulsar/impl_consumer.go
@@ -79,6 +79,7 @@ func newConsumer(client *client, options *ConsumerOptions) (*consumer, error) {
func singleTopicSubscribe(client *client, options *ConsumerOptions, topic string) (*consumer, error) {
c := &consumer{
topicName: topic,
+ log: log.WithField("topic", topic),
queue: make(chan ConsumerMessage, options.ReceiverQueueSize),
}
@@ -100,7 +101,7 @@ func singleTopicSubscribe(client *client, options *ConsumerOptions, topic string
for partitionIdx, partitionTopic := range partitions {
go func(partitionIdx int, partitionTopic string) {
- cons, err := newPartitionConsumer(client, partitionTopic, options, partitionIdx)
+ cons, err := newPartitionConsumer(client, partitionTopic, options, partitionIdx, numPartitions, c.queue)
ch <- ConsumerError{
err: err,
partition: partitionIdx,
@@ -153,31 +154,63 @@ func (c *consumer) Unsubscribe() error {
return nil
}
-func (c *consumer) Receive(ctx context.Context) (Message, error) {
+func (c *consumer) getMessageFromSubConsumer(ctx context.Context) {
for _, pc := range c.consumers {
go func(pc Consumer) {
- if err := pc.ReceiveAsync(ctx, c.queue); err != nil {
+ err := pc.ReceiveAsync(ctx, c.queue)
+ if err != nil {
return
}
}(pc)
}
+}
- select {
- case <-ctx.Done():
- return nil, ctx.Err()
- case msg, ok := <-c.queue:
- if ok {
- return msg.Message, nil
+func (c *consumer) Receive(ctx context.Context) (message Message, err error) {
+ if len(c.consumers) > 1 {
+ select {
+ case <-ctx.Done():
+ return nil, ctx.Err()
+ case cMsg, ok := <-c.queue:
+ if ok {
+ return cMsg.Message, nil
+ }
+ return nil, errors.New("receive message error")
}
- return nil, errors.New("receive message error")
}
+
+ return c.consumers[0].(*partitionConsumer).Receive(ctx)
}
func (c *consumer) ReceiveAsync(ctx context.Context, msgs chan<- ConsumerMessage) error {
- //TODO: impl logic
+ for _, pc := range c.consumers {
+ go func(pc Consumer) {
+ if err := pc.ReceiveAsync(ctx, msgs); err != nil {
+ c.log.Errorf("receive async messages error:%s, please check.", err.Error())
+ return
+ }
+ }(pc)
+ }
+
return nil
}
+func (c *consumer) ReceiveAsyncWithCallback(ctx context.Context, callback func(msg Message, err error)) {
+ var err error
+ if len(c.consumers) > 1 {
+ select {
+ case <-ctx.Done():
+ c.log.Errorf("ReceiveAsyncWithCallback: receive message error:%s", ctx.Err().Error())
+ return
+ case cMsg, ok := <-c.queue:
+ if ok {
+ callback(cMsg.Message, err)
+ }
+ return
+ }
+ }
+ c.consumers[0].(*partitionConsumer).ReceiveAsyncWithCallback(ctx, callback)
+}
+
//Ack the consumption of a single message
func (c *consumer) Ack(msg Message) error {
return c.AckID(msg.ID())
diff --git a/pulsar/impl_partition_consumer.go b/pulsar/impl_partition_consumer.go
index 0d7069f..87cf68b 100644
--- a/pulsar/impl_partition_consumer.go
+++ b/pulsar/impl_partition_consumer.go
@@ -37,7 +37,7 @@ const maxRedeliverUnacknowledged = 1000
type consumerState int
const (
- consumerInit = iota
+ consumerInit consumerState = iota
consumerReady
consumerClosing
consumerClosed
@@ -60,16 +60,17 @@ type partitionConsumer struct {
consumerID uint64
subQueue chan ConsumerMessage
- omu sync.Mutex // protects following
- overflow []*pb.MessageIdData
+ omu sync.Mutex // protects following
+ redeliverMessages []*pb.MessageIdData
unAckTracker *UnackedMessageTracker
eventsChan chan interface{}
partitionIdx int
+ partitionNum int
}
-func newPartitionConsumer(client *client, topic string, options *ConsumerOptions, partitionID int) (*partitionConsumer, error) {
+func newPartitionConsumer(client *client, topic string, options *ConsumerOptions, partitionID, partitionNum int, ch chan ConsumerMessage) (*partitionConsumer, error) {
c := &partitionConsumer{
state: consumerInit,
client: client,
@@ -78,6 +79,7 @@ func newPartitionConsumer(client *client, topic string, options *ConsumerOptions
log: log.WithField("topic", topic),
consumerID: client.rpcClient.NewConsumerID(),
partitionIdx: partitionID,
+ partitionNum: partitionNum,
eventsChan: make(chan interface{}),
subQueue: make(chan ConsumerMessage, options.ReceiverQueueSize),
}
@@ -108,7 +110,7 @@ func newPartitionConsumer(client *client, topic string, options *ConsumerOptions
if options.Type == Shared || options.Type == KeyShared {
if options.AckTimeout != 0 {
c.unAckTracker = NewUnackedMessageTracker()
- c.unAckTracker.pc = c
+ c.unAckTracker.pcs = append(c.unAckTracker.pcs, c)
c.unAckTracker.Start(int64(options.AckTimeout))
}
}
@@ -128,6 +130,18 @@ func newPartitionConsumer(client *client, topic string, options *ConsumerOptions
c.log = c.log.WithField("name", c.consumerName)
c.log.Info("Created consumer")
c.state = consumerReady
+
+ // In here, open a gorutine to receive data asynchronously from the subConsumer,
+ // filling the queue channel of the current consumer.
+ if partitionNum > 1 {
+ go func() {
+ err = c.ReceiveAsync(context.Background(), ch)
+ if err != nil {
+ return
+ }
+ }()
+ }
+
go c.runEventsLoop()
return c, nil
@@ -238,35 +252,60 @@ func (pc *partitionConsumer) internalUnsubscribe(unsub *handleUnsubscribe) {
unsub.waitGroup.Done()
}
-func (pc *partitionConsumer) Receive(ctx context.Context) (Message, error) {
- select {
- case <-ctx.Done():
- return nil, ctx.Err()
- case cm, ok := <-pc.subQueue:
- if ok {
- id := &pb.MessageIdData{}
- err := proto.Unmarshal(cm.ID().Serialize(), id)
- if err != nil {
- pc.log.WithError(err).Errorf("unserialize message id error:%s", err.Error())
- return nil, err
- }
- if pc.unAckTracker != nil {
- pc.unAckTracker.Add(id)
- }
- return cm.Message, nil
- }
- return nil, newError(ResultConnectError, "receive queue closed")
+func (pc *partitionConsumer) trackMessage(msgID MessageID) error {
+ id := &pb.MessageIdData{}
+ err := proto.Unmarshal(msgID.Serialize(), id)
+ if err != nil {
+ pc.log.WithError(err).Errorf("unserialize message id error:%s", err.Error())
+ return err
+ }
+ if pc.unAckTracker != nil {
+ pc.unAckTracker.Add(id)
}
+ return nil
}
-func (pc *partitionConsumer) ReceiveAsync(ctx context.Context, msgs chan<- ConsumerMessage) error {
+func (pc *partitionConsumer) increaseAvailablePermits(receivedSinceFlow uint32) error {
highwater := uint32(math.Max(float64(cap(pc.options.MessageChannel)/2), 1))
+ if receivedSinceFlow >= highwater {
+ if err := pc.internalFlow(receivedSinceFlow); err != nil {
+ pc.log.Errorf("Send Flow cmd error:%s", err.Error())
+ return err
+ }
+ receivedSinceFlow = 0
+ }
+ return nil
+}
+
+func (pc *partitionConsumer) messageProcessed(msgID MessageID, receivedSinceFlow uint32) error {
+ err := pc.trackMessage(msgID)
+ if err != nil {
+ return err
+ }
+ receivedSinceFlow++
- // request half the buffer's capacity
- if err := pc.internalFlow(highwater); err != nil {
- pc.log.Errorf("Send Flow cmd error:%s", err.Error())
+ err = pc.increaseAvailablePermits(receivedSinceFlow)
+ if err != nil {
return err
}
+
+ return nil
+}
+
+func (pc *partitionConsumer) Receive(ctx context.Context) (message Message, err error) {
+ wg := &sync.WaitGroup{}
+ wg.Add(1)
+ pc.ReceiveAsyncWithCallback(ctx, func(msg Message, e error) {
+ message = msg
+ err = e
+ wg.Done()
+ })
+ wg.Wait()
+
+ return message, err
+}
+
+func (pc *partitionConsumer) ReceiveAsync(ctx context.Context, msgs chan<- ConsumerMessage) error {
var receivedSinceFlow uint32
for {
@@ -274,30 +313,38 @@ func (pc *partitionConsumer) ReceiveAsync(ctx context.Context, msgs chan<- Consu
case tmpMsg, ok := <-pc.subQueue:
if ok {
msgs <- tmpMsg
- id := &pb.MessageIdData{}
- err := proto.Unmarshal(tmpMsg.ID().Serialize(), id)
+
+ err := pc.messageProcessed(tmpMsg.ID(), receivedSinceFlow)
if err != nil {
- pc.log.WithError(err).Errorf("unserialize message id error:%s", err.Error())
return err
}
- if pc.unAckTracker != nil {
- pc.unAckTracker.Add(id)
- }
- receivedSinceFlow++
- if receivedSinceFlow >= highwater {
- if err := pc.internalFlow(receivedSinceFlow); err != nil {
- pc.log.Errorf("Send Flow cmd error:%s", err.Error())
- return err
- }
- receivedSinceFlow = 0
- }
continue
}
+ break
case <-ctx.Done():
return ctx.Err()
}
}
+}
+func (pc *partitionConsumer) ReceiveAsyncWithCallback(ctx context.Context, callback func(msg Message, err error)) {
+ var receivedSinceFlow uint32
+ var err error
+
+ select {
+ case tmpMsg, ok := <-pc.subQueue:
+ if ok {
+ err = pc.messageProcessed(tmpMsg.ID(), receivedSinceFlow)
+ callback(tmpMsg.Message, err)
+ if err != nil {
+ pc.log.Errorf("processed messages error:%s", err.Error())
+ return
+ }
+ }
+ case <-ctx.Done():
+ pc.log.Errorf("context shouldn't done, please check error:%s", ctx.Err().Error())
+ return
+ }
}
func (pc *partitionConsumer) Ack(msg Message) error {
@@ -465,23 +512,23 @@ func (pc *partitionConsumer) internalRedeliver(redeliver *handleRedeliver) {
pc.omu.Lock()
defer pc.omu.Unlock()
- overFlowSize := len(pc.overflow)
+ redeliverMessagesSize := len(pc.redeliverMessages)
- if overFlowSize == 0 {
+ if redeliverMessagesSize == 0 {
return
}
requestID := pc.client.rpcClient.NewRequestID()
- for i := 0; i < len(pc.overflow); i += maxRedeliverUnacknowledged {
+ for i := 0; i < len(pc.redeliverMessages); i += maxRedeliverUnacknowledged {
end := i + maxRedeliverUnacknowledged
- if end > overFlowSize {
- end = overFlowSize
+ if end > redeliverMessagesSize {
+ end = redeliverMessagesSize
}
_, err := pc.client.rpcClient.RequestOnCnxNoWait(pc.cnx, requestID,
pb.BaseCommand_REDELIVER_UNACKNOWLEDGED_MESSAGES, &pb.CommandRedeliverUnacknowledgedMessages{
ConsumerId: proto.Uint64(pc.consumerID),
- MessageIds: pc.overflow[i:end],
+ MessageIds: pc.redeliverMessages[i:end],
})
if err != nil {
pc.log.WithError(err).Error("Failed to unsubscribe consumer")
@@ -489,8 +536,8 @@ func (pc *partitionConsumer) internalRedeliver(redeliver *handleRedeliver) {
}
}
- // clear Overflow slice
- pc.overflow = nil
+ // clear redeliverMessages slice
+ pc.redeliverMessages = nil
if pc.unAckTracker != nil {
pc.unAckTracker.clear()
@@ -574,56 +621,58 @@ func (pc *partitionConsumer) internalFlow(permits uint32) error {
return nil
}
-func (pc *partitionConsumer) HandlerMessage(response *pb.CommandMessage, headersAndPayload []byte) error {
+func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, headersAndPayload []byte) error {
msgID := response.GetMessageId()
id := newMessageID(int64(msgID.GetLedgerId()), int64(msgID.GetEntryId()),
int(msgID.GetBatchIndex()), pc.partitionIdx)
- msgMeta, payload, err := internal.ParseMessage(headersAndPayload)
+ msgMeta, payloadList, err := internal.ParseMessage(headersAndPayload)
if err != nil {
return fmt.Errorf("parse message error:%s", err)
}
- //numMsgs := msgMeta.GetNumMessagesInBatch()
-
- msg := &message{
- publishTime: timeFromUnixTimestampMillis(msgMeta.GetPublishTime()),
- eventTime: timeFromUnixTimestampMillis(msgMeta.GetEventTime()),
- key: msgMeta.GetPartitionKey(),
- properties: internal.ConvertToStringMap(msgMeta.GetProperties()),
- topic: pc.topic,
- msgID: id,
- payLoad: payload,
- }
+ for _, payload := range payloadList {
+ msg := &message{
+ publishTime: timeFromUnixTimestampMillis(msgMeta.GetPublishTime()),
+ eventTime: timeFromUnixTimestampMillis(msgMeta.GetEventTime()),
+ key: msgMeta.GetPartitionKey(),
+ properties: internal.ConvertToStringMap(msgMeta.GetProperties()),
+ topic: pc.topic,
+ msgID: id,
+ payLoad: payload,
+ }
- consumerMsg := ConsumerMessage{
- Message: msg,
- Consumer: pc,
- }
+ consumerMsg := ConsumerMessage{
+ Message: msg,
+ Consumer: pc,
+ }
- select {
- case pc.subQueue <- consumerMsg:
- // Add messageId to Overflow buffer, avoiding duplicates.
- newMid := response.GetMessageId()
- var dup bool
-
- pc.omu.Lock()
- for _, mid := range pc.overflow {
- if proto.Equal(mid, newMid) {
- dup = true
- break
+ select {
+ case pc.subQueue <- consumerMsg:
+ //Add messageId to redeliverMessages buffer, avoiding duplicates.
+ newMid := response.GetMessageId()
+ var dup bool
+
+ pc.omu.Lock()
+ for _, mid := range pc.redeliverMessages {
+ if proto.Equal(mid, newMid) {
+ dup = true
+ break
+ }
}
- }
- if !dup {
- pc.overflow = append(pc.overflow, newMid)
+ if !dup {
+ pc.redeliverMessages = append(pc.redeliverMessages, newMid)
+ }
+ pc.omu.Unlock()
+ continue
+ default:
+ return fmt.Errorf("consumer message channel on topic %s is full (capacity = %d)", pc.Topic(), cap(pc.options.MessageChannel))
}
- pc.omu.Unlock()
- return nil
- default:
- return fmt.Errorf("consumer message channel on topic %s is full (capacity = %d)", pc.Topic(), cap(pc.options.MessageChannel))
}
+
+ return nil
}
type handleAck struct {
diff --git a/pulsar/impl_partition_producer.go b/pulsar/impl_partition_producer.go
index 98e2156..e00c1e1 100644
--- a/pulsar/impl_partition_producer.go
+++ b/pulsar/impl_partition_producer.go
@@ -34,7 +34,7 @@ import (
type producerState int
const (
- producerInit = iota
+ producerInit producerState = iota
producerReady
producerClosing
producerClosed
@@ -249,7 +249,8 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
sequenceID := internal.GetAndAdd(p.sequenceIDGenerator, 1)
if sendAsBatch {
- for p.batchBuilder.Add(smm, sequenceID, msg.Payload, request, msg.ReplicationClusters) == false {
+ ok := p.batchBuilder.Add(smm, sequenceID, msg.Payload, request, msg.ReplicationClusters)
+ if ok == false {
// The current batch is full.. flush it and retry
p.internalFlushCurrentBatch()
}
@@ -321,13 +322,25 @@ func (p *partitionProducer) Send(ctx context.Context, msg *ProducerMessage) erro
func (p *partitionProducer) SendAsync(ctx context.Context, msg *ProducerMessage,
callback func(MessageID, *ProducerMessage, error)) {
p.publishSemaphore.Acquire()
- p.eventsChan <- &sendRequest{ctx, msg, callback, false}
+ sr := &sendRequest{
+ ctx: ctx,
+ msg: msg,
+ callback: callback,
+ flushImmediately: false,
+ }
+ p.eventsChan <- sr
}
func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *ProducerMessage,
callback func(MessageID, *ProducerMessage, error), flushImmediately bool) {
p.publishSemaphore.Acquire()
- p.eventsChan <- &sendRequest{ctx, msg, callback, flushImmediately}
+ sr := &sendRequest{
+ ctx: ctx,
+ msg: msg,
+ callback: callback,
+ flushImmediately: flushImmediately,
+ }
+ p.eventsChan <- sr
}
func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) {
diff --git a/pulsar/internal/commands.go b/pulsar/internal/commands.go
index 04545c1..1f29f7f 100644
--- a/pulsar/internal/commands.go
+++ b/pulsar/internal/commands.go
@@ -87,7 +87,7 @@ func addSingleMessageToBatch(wb Buffer, smm *pb.SingleMessageMetadata, payload [
wb.Write(payload)
}
-func ParseMessage(headersAndPayload []byte) (msgMeta *pb.MessageMetadata, payload []byte, err error) {
+func ParseMessage(headersAndPayload []byte) (msgMeta *pb.MessageMetadata, payloadList [][]byte, err error) {
// reusable buffer for 4-byte uint32s
buf32 := make([]byte, 4)
r := bytes.NewReader(headersAndPayload)
@@ -164,33 +164,63 @@ func ParseMessage(headersAndPayload []byte) (msgMeta *pb.MessageMetadata, payloa
return nil, nil, err
}
+ numMsg := msgMeta.GetNumMessagesInBatch()
+
+ if numMsg > 0 && msgMeta.NumMessagesInBatch != nil {
+ payloads := make([]byte, lr.N)
+ if _, err = io.ReadFull(lr, payloads); err != nil {
+ return nil, nil, err
+ }
+
+ singleMessages, err := decodeBatchPayload(payloads, numMsg)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ payloadList = make([][]byte, 0, numMsg)
+ for _, singleMsg := range singleMessages {
+ msgMeta.PartitionKey = singleMsg.SingleMeta.PartitionKey
+ msgMeta.Properties = singleMsg.SingleMeta.Properties
+ msgMeta.EventTime = singleMsg.SingleMeta.EventTime
+ payloadList = append(payloadList, singleMsg.SinglePayload)
+ }
+
+ if err := computeChecksum(chksum, expectedChksum); err != nil {
+ return nil, nil, err
+ }
+ return msgMeta, payloadList, nil
+ }
// Anything left in the frame is considered
// the payload and can be any sequence of bytes.
- payloads := make([]byte, lr.N)
- if _, err = io.ReadFull(lr, payloads); err != nil {
- return nil, nil, err
- }
+ payloadList = make([][]byte, 0, 10)
+ if lr.N > 0 {
+ // guard against allocating large buffer
+ if lr.N > MaxFrameSize {
+ return nil, nil, fmt.Errorf("frame payload size (%d) cannot be greater than max frame size (%d)", lr.N, MaxFrameSize)
+ }
- numMsg := msgMeta.GetNumMessagesInBatch()
+ payload := make([]byte, lr.N)
+ if _, err = io.ReadFull(lr, payload); err != nil {
+ return nil, nil, err
+ }
- singleMessages, err := decodeBatchPayload(payloads, numMsg)
- if err != nil {
- return nil, nil, err
+ payloadList = append(payloadList, payload)
}
- for _, singleMsg := range singleMessages {
- payload = singleMsg.SinglePayload
- msgMeta.PartitionKey = singleMsg.SingleMeta.PartitionKey
- msgMeta.Properties = singleMsg.SingleMeta.Properties
- msgMeta.EventTime = singleMsg.SingleMeta.EventTime
+ if err := computeChecksum(chksum, expectedChksum); err != nil {
+ return nil, nil, err
}
- if computed := chksum.compute(); !bytes.Equal(computed, expectedChksum) {
- return nil, nil, fmt.Errorf("checksum mismatch: computed (0x%X) does "+
+ return msgMeta, payloadList, nil
+}
+
+func computeChecksum(chksum CheckSum, expectedChksum []byte) error {
+ computed := chksum.compute()
+ if !bytes.Equal(computed, expectedChksum) {
+ return fmt.Errorf("checksum mismatch: computed (0x%X) does "+
"not match given checksum (0x%X)", computed, expectedChksum)
}
-
- return msgMeta, payload, nil
+ return nil
}
func serializeBatch(wb Buffer, cmdSend *pb.BaseCommand, msgMetadata *pb.MessageMetadata, payload []byte) {
@@ -252,7 +282,7 @@ type singleMessage struct {
func decodeBatchPayload(bp []byte, batchNum int32) ([]*singleMessage, error) {
buf32 := make([]byte, 4)
rdBuf := bytes.NewReader(bp)
- list := make([]*singleMessage, 0, batchNum)
+ singleMsgList := make([]*singleMessage, 0, batchNum)
for i := int32(0); i < batchNum; i++ {
// singleMetaSize
if _, err := io.ReadFull(rdBuf, buf32); err != nil {
@@ -274,13 +304,15 @@ func decodeBatchPayload(bp []byte, batchNum int32) ([]*singleMessage, error) {
if _, err := io.ReadFull(rdBuf, singlePayload); err != nil {
return nil, err
}
- d := &singleMessage{}
- d.SingleMetaSize = singleMetaSize
- d.SingleMeta = singleMeta
- d.SinglePayload = singlePayload
- list = append(list, d)
+ singleMsg := &singleMessage{
+ SingleMetaSize: singleMetaSize,
+ SingleMeta: singleMeta,
+ SinglePayload: singlePayload,
+ }
+
+ singleMsgList = append(singleMsgList, singleMsg)
}
- return list, nil
+ return singleMsgList, nil
}
// ConvertFromStringMap convert a string map to a KeyValue []byte
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index d83f30b..2c707ea 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -64,7 +64,7 @@ type Connection interface {
}
type ConsumerHandler interface {
- HandlerMessage(response *pb.CommandMessage, headersAndPayload []byte) error
+ MessageReceived(response *pb.CommandMessage, headersAndPayload []byte) error
}
type connectionState int
@@ -131,7 +131,7 @@ func newConnection(logicalAddr *url.URL, physicalAddr *url.URL, tlsOptions *TLSO
incomingRequests: make(chan *request),
writeRequests: make(chan []byte),
listeners: make(map[uint64]ConnectionListener),
- connWrapper: NewConnWrapper(),
+ connWrapper: NewConnWrapper(),
}
cnx.reader = newConnectionReader(cnx)
cnx.cond = sync.NewCond(cnx)
@@ -307,7 +307,7 @@ func (c *connection) writeCommand(cmd proto.Message) {
func (c *connection) receivedCommand(cmd *pb.BaseCommand, headersAndPayload []byte) {
c.log.Debugf("Received command: %s -- payload: %v", cmd, headersAndPayload)
c.lastDataReceivedTime = time.Now()
- var err error
+ var err error
switch *cmd.Type {
case pb.BaseCommand_SUCCESS:
@@ -344,7 +344,7 @@ func (c *connection) receivedCommand(cmd *pb.BaseCommand, headersAndPayload []by
case pb.BaseCommand_SEND_ERROR:
case pb.BaseCommand_MESSAGE:
- err = c.handleMessage(cmd.GetMessage(), headersAndPayload)
+ err = c.handleMessage(cmd.GetMessage(), headersAndPayload)
case pb.BaseCommand_PING:
c.handlePing()
case pb.BaseCommand_PONG:
@@ -353,9 +353,9 @@ func (c *connection) receivedCommand(cmd *pb.BaseCommand, headersAndPayload []by
case pb.BaseCommand_ACTIVE_CONSUMER_CHANGE:
default:
- if err != nil {
- c.log.Errorf("Received invalid command type: %s", cmd.Type)
- }
+ if err != nil {
+ c.log.Errorf("Received invalid command type: %s", cmd.Type)
+ }
c.Close()
}
}
@@ -403,18 +403,18 @@ func (c *connection) handleSendReceipt(response *pb.CommandSendReceipt) {
}
func (c *connection) handleMessage(response *pb.CommandMessage, payload []byte) error {
- c.log.Debug("Got Message: ", response)
- consumerId := response.GetConsumerId()
- if consumer, ok := c.connWrapper.Consumers[consumerId]; ok {
- err := consumer.HandlerMessage(response, payload)
- if err != nil {
- c.log.WithField("consumerId", consumerId).Error("handle message err: ", response.MessageId)
- return errors.New("handler not found")
- }
- } else {
- c.log.WithField("consumerId", consumerId).Warn("Got unexpected message: ", response.MessageId)
- }
- return nil
+ c.log.Debug("Got Message: ", response)
+ consumerId := response.GetConsumerId()
+ if consumer, ok := c.connWrapper.Consumers[consumerId]; ok {
+ err := consumer.MessageReceived(response, payload)
+ if err != nil {
+ c.log.WithField("consumerId", consumerId).Error("handle message err: ", response.MessageId)
+ return errors.New("handler not found")
+ }
+ } else {
+ c.log.WithField("consumerId", consumerId).Warn("Got unexpected message: ", response.MessageId)
+ }
+ return nil
}
func (c *connection) sendPing() {
@@ -522,8 +522,8 @@ func (c *connection) getTLSConfig() (*tls.Config, error) {
}
type ConnWrapper struct {
- Rwmu sync.RWMutex
- Consumers map[uint64]ConsumerHandler
+ Rwmu sync.RWMutex
+ Consumers map[uint64]ConsumerHandler
}
func NewConnWrapper() *ConnWrapper {
@@ -533,13 +533,13 @@ func NewConnWrapper() *ConnWrapper {
}
func (c *connection) AddConsumeHandler(id uint64, handler ConsumerHandler) {
- c.connWrapper.Rwmu.Lock()
- c.connWrapper.Consumers[id] = handler
- c.connWrapper.Rwmu.Unlock()
+ c.connWrapper.Rwmu.Lock()
+ c.connWrapper.Consumers[id] = handler
+ c.connWrapper.Rwmu.Unlock()
}
func (c *connection) DeleteConsumeHandler(id uint64) {
- c.connWrapper.Rwmu.Lock()
- delete(c.connWrapper.Consumers, id)
- c.connWrapper.Rwmu.Unlock()
+ c.connWrapper.Rwmu.Lock()
+ delete(c.connWrapper.Consumers, id)
+ c.connWrapper.Rwmu.Unlock()
}
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 66683d0..e9327d0 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -20,6 +20,7 @@ package pulsar
import (
"context"
"fmt"
+ "net/http"
"sync"
"testing"
"time"
@@ -30,6 +31,55 @@ import (
log "github.com/sirupsen/logrus"
)
+func TestInvalidURL(t *testing.T) {
+ client, err := NewClient(ClientOptions{})
+
+ if client != nil || err == nil {
+ t.Fatal("Should have failed to create client")
+ }
+}
+
+func TestProducerConnectError(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: "pulsar://invalid-hostname:6650",
+ })
+
+ assert.Nil(t, err)
+
+ defer client.Close()
+
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: newTopicName(),
+ })
+
+ // Expect error in creating producer
+ assert.Nil(t, producer)
+ assert.NotNil(t, err)
+
+ assert.Equal(t, err.Error(), "connection error")
+}
+
+func TestProducerNoTopic(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: "pulsar://localhost:6650",
+ })
+
+ if err != nil {
+ t.Fatal(err)
+ return
+ }
+
+ defer client.Close()
+
+ producer, err := client.CreateProducer(ProducerOptions{})
+
+ // Expect error in creating producer
+ assert.Nil(t, producer)
+ assert.NotNil(t, err)
+
+ assert.Equal(t, err.(*Error).Result(), ResultInvalidTopicName)
+}
+
func TestSimpleProducer(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: serviceURL,
@@ -92,7 +142,8 @@ func TestProducerAsyncSend(t *testing.T) {
assert.NoError(t, err)
}
- producer.Flush()
+ err = producer.Flush()
+ assert.Nil(t, err)
wg.Wait()
@@ -181,6 +232,213 @@ func TestProducerLastSequenceID(t *testing.T) {
assert.NoError(t, err)
}
+func TestEventTime(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: serviceURL,
+ })
+ assert.NoError(t, err)
+ defer client.Close()
+
+ topicName := "test-event-time"
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topicName,
+ })
+ assert.Nil(t, err)
+ defer producer.Close()
+
+ consumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topicName,
+ SubscriptionName: "subName",
+ })
+ assert.Nil(t, err)
+ defer consumer.Close()
+
+ eventTime := timeFromUnixTimestampMillis(uint64(1565161612))
+ err = producer.Send(context.Background(), &ProducerMessage{
+ Payload: []byte(fmt.Sprintf("test-event-time")),
+ EventTime: &eventTime,
+ })
+ assert.Nil(t, err)
+
+ msg, err := consumer.Receive(context.Background())
+ assert.Nil(t, err)
+ actualEventTime := msg.EventTime()
+ assert.Equal(t, eventTime.Unix(), actualEventTime.Unix())
+}
+
+func TestFlushInProducer(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: serviceURL,
+ })
+ assert.NoError(t, err)
+ defer client.Close()
+
+ topicName := "test-flush-in-producer"
+ subName := "subscription-name"
+ numOfMessages := 10
+ ctx := context.Background()
+
+ // set batch message number numOfMessages, and max delay 10s
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topicName,
+ DisableBatching: false,
+ BatchingMaxMessages: uint(numOfMessages),
+ BatchingMaxPublishDelay: time.Second * 10,
+ BlockIfQueueFull: true,
+ Properties: map[string]string{
+ "producer-name": "test-producer-name",
+ "producer-id": "test-producer-id",
+ },
+ })
+ defer producer.Close()
+
+ consumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topicName,
+ SubscriptionName: subName,
+ })
+ assert.Nil(t, err)
+ defer consumer.Close()
+
+ prefix := "msg-batch-async"
+ msgCount := 0
+
+ wg := sync.WaitGroup{}
+ wg.Add(5)
+ errors := util.NewBlockingQueue(10)
+ for i := 0; i < numOfMessages/2; i++ {
+ messageContent := prefix + fmt.Sprintf("%d", i)
+ producer.SendAsync(ctx, &ProducerMessage{
+ Payload: []byte(messageContent),
+ }, func(id MessageID, producerMessage *ProducerMessage, e error) {
+ if e != nil {
+ log.WithError(e).Error("Failed to publish")
+ errors.Put(e)
+ } else {
+ log.Info("Published message ", id)
+ }
+ wg.Done()
+ })
+ assert.Nil(t, err)
+ }
+ err = producer.Flush()
+ assert.Nil(t, err)
+ wg.Wait()
+
+ for i := 0; i < numOfMessages/2; i++ {
+ _, err := consumer.Receive(ctx)
+ assert.Nil(t, err)
+ msgCount++
+ }
+
+ assert.Equal(t, msgCount, numOfMessages/2)
+
+ wg.Add(5)
+ for i := numOfMessages / 2; i < numOfMessages; i++ {
+ messageContent := prefix + fmt.Sprintf("%d", i)
+ producer.SendAsync(ctx, &ProducerMessage{
+ Payload: []byte(messageContent),
+ }, func(id MessageID, producerMessage *ProducerMessage, e error) {
+ if e != nil {
+ log.WithError(e).Error("Failed to publish")
+ errors.Put(e)
+ } else {
+ log.Info("Published message ", id)
+ }
+ wg.Done()
+ })
+ assert.Nil(t, err)
+ }
+
+ err = producer.Flush()
+ assert.Nil(t, err)
+ wg.Wait()
+
+ for i := numOfMessages / 2; i < numOfMessages; i++ {
+ _, err := consumer.Receive(ctx)
+ assert.Nil(t, err)
+ msgCount++
+ }
+ assert.Equal(t, msgCount, numOfMessages)
+}
+
+func TestFlushInPartitionedProducer(t *testing.T) {
+ topicName := "persistent://public/default/partition-testFlushInPartitionedProducer"
+
+ // call admin api to make it partitioned
+ url := adminURL + "/" + "admin/v2/" + topicName + "/partitions"
+ makeHTTPCall(t, http.MethodPut, url, "5")
+
+ numberOfPartitions := 5
+ numOfMessages := 10
+ ctx := context.Background()
+
+ // creat client connection
+ client, err := NewClient(ClientOptions{
+ URL: serviceURL,
+ })
+ assert.NoError(t, err)
+ defer client.Close()
+
+ // create consumer
+ consumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topicName,
+ SubscriptionName: "my-sub",
+ Type: Exclusive,
+ })
+ assert.Nil(t, err)
+ defer consumer.Close()
+
+ // create producer and set batch message number numOfMessages, and max delay 10s
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topicName,
+ DisableBatching: false,
+ BatchingMaxMessages: uint(numOfMessages / numberOfPartitions),
+ BatchingMaxPublishDelay: time.Second * 10,
+ BlockIfQueueFull: true,
+ })
+ defer producer.Close()
+
+ // send 5 messages
+ prefix := "msg-batch-async-"
+ wg := sync.WaitGroup{}
+ wg.Add(5)
+ errors := util.NewBlockingQueue(5)
+ for i := 0; i < numOfMessages/2; i++ {
+ messageContent := prefix + fmt.Sprintf("%d", i)
+ producer.SendAsync(ctx, &ProducerMessage{
+ Payload: []byte(messageContent),
+ }, func(id MessageID, producerMessage *ProducerMessage, e error) {
+ if e != nil {
+ log.WithError(e).Error("Failed to publish")
+ errors.Put(e)
+ } else {
+ log.Info("Published message: ", id)
+ }
+ wg.Done()
+ })
+ assert.Nil(t, err)
+ }
+
+ // After flush, should be able to consume.
+ err = producer.Flush()
+ assert.Nil(t, err)
+
+ wg.Wait()
+
+ // Receive all messages
+ msgCount := 0
+ for i := 0; i < numOfMessages/2; i++ {
+ msg, err := consumer.Receive(ctx)
+ fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
+ msg.ID(), string(msg.Payload()))
+ assert.Nil(t, err)
+ err = consumer.Ack(msg)
+ assert.Nil(t, err)
+ msgCount++
+ }
+ assert.Equal(t, msgCount, numOfMessages/2)
+}
+
func TestMessageRouter(t *testing.T) {
// Create topic with 5 partitions
httpPut("http://localhost:8080/admin/v2/persistent/public/default/my-partitioned-topic/partitions", 5)
diff --git a/pulsar/unackedMsgTracker.go b/pulsar/unackedMsgTracker.go
index 8ec51c6..c46b731 100644
--- a/pulsar/unackedMsgTracker.go
+++ b/pulsar/unackedMsgTracker.go
@@ -34,7 +34,6 @@ type UnackedMessageTracker struct {
oldOpenSet set.Set
timeout *time.Ticker
- pc *partitionConsumer
pcs []*partitionConsumer
}
@@ -159,22 +158,7 @@ func (t *UnackedMessageTracker) handlerCmd(ackTimeoutMillis int64) {
t.oldOpenSet.Clear()
- if t.pc != nil {
- requestID := t.pc.client.rpcClient.NewRequestID()
- cmd := &pb.CommandRedeliverUnacknowledgedMessages{
- ConsumerId: proto.Uint64(t.pc.consumerID),
- MessageIds: messageIds,
- }
-
- _, err := t.pc.client.rpcClient.RequestOnCnx(t.pc.cnx, requestID,
- pb.BaseCommand_REDELIVER_UNACKNOWLEDGED_MESSAGES, cmd)
- if err != nil {
- t.pc.log.WithError(err).Error("Failed to unsubscribe consumer")
- return
- }
-
- log.Debugf("consumer:%v redeliver messages num:%d", t.pc.consumerName, len(messageIds))
- } else if t.pcs != nil {
+ if t.pcs != nil {
messageIdsMap := make(map[int32][]*pb.MessageIdData)
for _, msgID := range messageIds {
messageIdsMap[msgID.GetPartition()] = append(messageIdsMap[msgID.GetPartition()], msgID)
@@ -198,7 +182,7 @@ func (t *UnackedMessageTracker) handlerCmd(ackTimeoutMillis int64) {
}
}
}
- log.Debug("Tick at ", tick)
+ log.Debugf("Tick at: %v", tick)
}
t.toggle()
diff --git a/util/util.go b/util/util.go
index 06a7e53..bd4f5d6 100644
--- a/util/util.go
+++ b/util/util.go
@@ -18,15 +18,27 @@
package util
import (
- `reflect`
+ "reflect"
)
// IsNil check if the interface is nil
func IsNil(i interface{}) bool {
- vi := reflect.ValueOf(i)
- if vi.Kind() == reflect.Ptr {
- return vi.IsNil()
- }
- return false
+ vi := reflect.ValueOf(i)
+ if vi.Kind() == reflect.Ptr {
+ return vi.IsNil()
+ }
+ return false
}
+// RemoveDuplicateElement remove repeating elements from the string slice
+func RemoveDuplicateElement(addrs []string) []string {
+ result := make([]string, 0, len(addrs))
+ temp := map[string]struct{}{}
+ for _, item := range addrs {
+ if _, ok := temp[item]; !ok {
+ temp[item] = struct{}{}
+ result = append(result, item)
+ }
+ }
+ return result
+}
diff --git a/util/util_test.go b/util/util_test.go
index 2e1195c..284dd0c 100644
--- a/util/util_test.go
+++ b/util/util_test.go
@@ -18,14 +18,23 @@
package util
import (
- `github.com/stretchr/testify/assert`
- `testing`
+ "fmt"
+ "github.com/stretchr/testify/assert"
+ "strings"
+ "testing"
)
func TestIsNil(t *testing.T) {
- var a interface{} = nil
- var b interface{} = (*int)(nil)
+ var a interface{} = nil
+ var b interface{} = (*int)(nil)
- assert.True(t, a == nil)
- assert.False(t, b == nil)
+ assert.True(t, a == nil)
+ assert.False(t, b == nil)
+}
+
+func TestRemoveDuplicateElement(t *testing.T) {
+ s := []string{"hello", "world", "hello", "golang", "hello", "ruby", "php", "java"}
+ resList := RemoveDuplicateElement(s)
+ res := fmt.Sprintf("%s", resList[:])
+ assert.Equal(t, 1, strings.Count(res, "hello"))
}