You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/11/12 05:05:02 UTC
[pulsar-client-go] branch master updated: Removed unacked messages
tracker (#90)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 367984a Removed unacked messages tracker (#90)
367984a is described below
commit 367984a8dabec7efc8185fd88a366b1535774725
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Mon Nov 11 21:04:56 2019 -0800
Removed unacked messages tracker (#90)
* Removed unacked messages tracker
* Fixed tests
---
pulsar/consumer.go | 9 --
pulsar/consumer_impl.go | 9 +-
pulsar/consumer_test.go | 2 -
pulsar/unacked_msg_tracker.go | 199 -------------------------------------
pulsar/unacked_msg_tracker_test.go | 66 ------------
5 files changed, 1 insertion(+), 284 deletions(-)
diff --git a/pulsar/consumer.go b/pulsar/consumer.go
index 3b729ec..b0bba1b 100644
--- a/pulsar/consumer.go
+++ b/pulsar/consumer.go
@@ -19,7 +19,6 @@ package pulsar
import (
"context"
- "time"
)
// Pair of a Consumer and Message
@@ -82,11 +81,6 @@ type ConsumerOptions struct {
// This properties will be visible in the topic stats
Properties map[string]string
- // Set the timeout for unacked messages
- // Message not acknowledged within the give time, will be replayed by the broker to the same or a different consumer
- // Default is 0, which means message are not being replayed based on ack time
- AckTimeout time.Duration
-
// Select the subscription type to be used when subscribing to the topic.
// Default is `Exclusive`
Type SubscriptionType
@@ -128,9 +122,6 @@ type ConsumerOptions struct {
// Consumer is an interface that abstracts behavior of Pulsar's consumer
type Consumer interface {
- // Topic get the topic for the consumer
- Topic() string
-
// Subscription get a subscription for the consumer
Subscription() string
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index af9b8d5..8b73631 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -34,8 +34,6 @@ import (
var ErrConsumerClosed = errors.New("consumer closed")
type consumer struct {
- topic string
-
options ConsumerOptions
consumers []*partitionConsumer
@@ -88,7 +86,6 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) {
func topicSubscribe(client *client, options ConsumerOptions, topic string,
messageCh chan ConsumerMessage) (Consumer, error) {
consumer := &consumer{
- topic: topic,
messageCh: messageCh,
errorCh: make(chan error),
log: log.WithField("topic", topic),
@@ -165,10 +162,6 @@ func topicSubscribe(client *client, options ConsumerOptions, topic string,
return consumer, nil
}
-func (c *consumer) Topic() string {
- return c.topic
-}
-
func (c *consumer) Subscription() string {
return c.options.SubscriptionName
}
@@ -177,7 +170,7 @@ func (c *consumer) Unsubscribe() error {
var errMsg string
for _, consumer := range c.consumers {
if err := consumer.Unsubscribe(); err != nil {
- errMsg += fmt.Sprintf("topic %s, subscription %s: %s", c.Topic(), c.Subscription(), err)
+ errMsg += fmt.Sprintf("topic %s, subscription %s: %s", consumer.topic, c.Subscription(), err)
}
}
if errMsg != "" {
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index bbc1315..33c2f15 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -149,7 +149,6 @@ func TestBatchMessageReceive(t *testing.T) {
SubscriptionName: subName,
})
assert.Nil(t, err)
- assert.Equal(t, topicName, consumer.Topic())
count := 0
for i := 0; i < numOfMessages; i++ {
@@ -397,7 +396,6 @@ func TestConsumerReceiveTimeout(t *testing.T) {
Topic: topic,
SubscriptionName: "my-sub1",
Type: Shared,
- AckTimeout: 5 * 1000,
})
assert.Nil(t, err)
defer consumer.Close()
diff --git a/pulsar/unacked_msg_tracker.go b/pulsar/unacked_msg_tracker.go
deleted file mode 100644
index 5dfe895..0000000
--- a/pulsar/unacked_msg_tracker.go
+++ /dev/null
@@ -1,199 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package pulsar
-
-import (
- "sync"
- "time"
-
- "github.com/golang/protobuf/proto"
-
- set "github.com/deckarep/golang-set"
- log "github.com/sirupsen/logrus"
-
- "github.com/apache/pulsar-client-go/pkg/pb"
-)
-
-type UnackedMessageTracker struct {
- cmu sync.RWMutex // protects following
- currentSet set.Set
- oldOpenSet set.Set
- timeout *time.Ticker
-
- pcs []*partitionConsumer
-}
-
-// NewUnackedMessageTracker init UnackedMessageTracker object
-func NewUnackedMessageTracker() *UnackedMessageTracker {
- unAckTracker := &UnackedMessageTracker{
- currentSet: set.NewSet(),
- oldOpenSet: set.NewSet(),
- }
-
- return unAckTracker
-}
-
-// Size return the size of current set and old open set cardinality
-func (t *UnackedMessageTracker) Size() int {
- t.cmu.Lock()
- defer t.cmu.Unlock()
-
- return t.currentSet.Cardinality() + t.oldOpenSet.Cardinality()
-}
-
-// IsEmpty check if the currentSet or oldOpenSet are empty.
-func (t *UnackedMessageTracker) IsEmpty() bool {
- t.cmu.RLock()
- defer t.cmu.RUnlock()
-
- return t.currentSet.Cardinality() == 0 && t.oldOpenSet.Cardinality() == 0
-}
-
-// Add will add message id data to currentSet and remove the message id from oldOpenSet.
-func (t *UnackedMessageTracker) Add(id *pb.MessageIdData) bool {
- t.cmu.Lock()
- defer t.cmu.Unlock()
-
- t.oldOpenSet.Remove(id)
- return t.currentSet.Add(id)
-}
-
-// Remove will remove message id data from currentSet and oldOpenSet
-func (t *UnackedMessageTracker) Remove(id *pb.MessageIdData) {
- t.cmu.Lock()
- defer t.cmu.Unlock()
-
- t.currentSet.Remove(id)
- t.oldOpenSet.Remove(id)
-}
-
-func (t *UnackedMessageTracker) clear() {
- t.cmu.Lock()
- defer t.cmu.Unlock()
-
- t.currentSet.Clear()
- t.oldOpenSet.Clear()
-}
-
-func (t *UnackedMessageTracker) toggle() {
- t.cmu.Lock()
- defer t.cmu.Unlock()
-
- t.currentSet, t.oldOpenSet = t.oldOpenSet, t.currentSet
-}
-
-func (t *UnackedMessageTracker) isAckTimeout() bool {
- t.cmu.RLock()
- defer t.cmu.RUnlock()
-
- return !(t.oldOpenSet.Cardinality() == 0)
-}
-
-func (t *UnackedMessageTracker) lessThanOrEqual(id1, id2 pb.MessageIdData) bool {
- return id1.GetPartition() == id2.GetPartition() &&
- (id1.GetLedgerId() < id2.GetLedgerId() || id1.GetEntryId() <= id2.GetEntryId())
-}
-
-func (t *UnackedMessageTracker) RemoveMessagesTill(id pb.MessageIdData) int {
- t.cmu.Lock()
- defer t.cmu.Unlock()
-
- counter := 0
-
- t.currentSet.Each(func(elem interface{}) bool {
- if t.lessThanOrEqual(elem.(pb.MessageIdData), id) {
- t.currentSet.Remove(elem)
- counter++
- }
- return true
- })
-
- t.oldOpenSet.Each(func(elem interface{}) bool {
- if t.lessThanOrEqual(elem.(pb.MessageIdData), id) {
- t.currentSet.Remove(elem)
- counter++
- }
- return true
- })
-
- return counter
-}
-
-func (t *UnackedMessageTracker) Start(ackTimeoutMillis int64) {
- t.cmu.Lock()
- defer t.cmu.Unlock()
- t.timeout = time.NewTicker((time.Duration(ackTimeoutMillis)) * time.Millisecond)
-
- go t.handlerCmd()
-}
-
-func (t *UnackedMessageTracker) handlerCmd() {
- for {
- select {
- case tick := <-t.timeout.C:
- if t.isAckTimeout() {
- t.cmu.Lock()
- log.Debugf(" %d messages have timed-out", t.oldOpenSet.Cardinality())
- messageIds := make([]*pb.MessageIdData, 0)
-
- t.oldOpenSet.Each(func(i interface{}) bool {
- messageIds = append(messageIds, i.(*pb.MessageIdData))
- return false
- })
- log.Debugf("messageID length is:%d", len(messageIds))
-
- t.oldOpenSet.Clear()
- t.cmu.Unlock()
-
- if t.pcs != nil {
- messageIdsMap := make(map[int32][]*pb.MessageIdData)
- for _, msgID := range messageIds {
- messageIdsMap[msgID.GetPartition()] = append(messageIdsMap[msgID.GetPartition()], msgID)
- }
-
- for index, subConsumer := range t.pcs {
- if messageIdsMap[int32(index)] != nil {
- requestID := subConsumer.client.rpcClient.NewRequestID()
- cmd := &pb.CommandRedeliverUnacknowledgedMessages{
- ConsumerId: proto.Uint64(subConsumer.consumerID),
- MessageIds: messageIdsMap[int32(index)],
- }
-
- _, err := subConsumer.client.rpcClient.RequestOnCnx(subConsumer.conn, requestID,
- pb.BaseCommand_REDELIVER_UNACKNOWLEDGED_MESSAGES, cmd)
- if err != nil {
- subConsumer.log.WithError(err).Error("Failed to unsubscribe consumer")
- return
- }
- }
- }
- }
- }
- log.Debugf("Tick at: %v", tick)
- }
-
- t.toggle()
- }
-}
-
-func (t *UnackedMessageTracker) Stop() {
- t.timeout.Stop()
- log.Debug("stop ticker ", t.timeout)
-
- t.clear()
-}
diff --git a/pulsar/unacked_msg_tracker_test.go b/pulsar/unacked_msg_tracker_test.go
deleted file mode 100644
index 3848ce9..0000000
--- a/pulsar/unacked_msg_tracker_test.go
+++ /dev/null
@@ -1,66 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package pulsar
-
-import (
- "testing"
-
- "github.com/golang/protobuf/proto"
- "github.com/stretchr/testify/assert"
-
- "github.com/apache/pulsar-client-go/pkg/pb"
-)
-
-func TestUnackedMessageTracker(t *testing.T) {
- unAckTracker := NewUnackedMessageTracker()
-
- var msgIDs []*pb.MessageIdData
-
- for i := 0; i < 5; i++ {
- msgID := &pb.MessageIdData{
- LedgerId: proto.Uint64(1),
- EntryId: proto.Uint64(uint64(i)),
- Partition: proto.Int32(-1),
- BatchIndex: proto.Int32(-1),
- }
-
- msgIDs = append(msgIDs, msgID)
- }
-
- for _, msgID := range msgIDs {
- ok := unAckTracker.Add(msgID)
- assert.True(t, ok)
- }
-
- flag := unAckTracker.IsEmpty()
- assert.False(t, flag)
-
- num := unAckTracker.Size()
- assert.Equal(t, num, 5)
-
- for index, msgID := range msgIDs {
- unAckTracker.Remove(msgID)
- assert.Equal(t, 4-index, unAckTracker.Size())
- }
-
- num = unAckTracker.Size()
- assert.Equal(t, num, 0)
-
- flag = unAckTracker.IsEmpty()
- assert.True(t, flag)
-}