You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/11/12 05:05:02 UTC

[pulsar-client-go] branch master updated: Removed unacked messages tracker (#90)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 367984a  Removed unacked messages tracker (#90)
367984a is described below

commit 367984a8dabec7efc8185fd88a366b1535774725
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Mon Nov 11 21:04:56 2019 -0800

    Removed unacked messages tracker (#90)
    
    * Removed unacked messages tracker
    
    * Fixed tests
---
 pulsar/consumer.go                 |   9 --
 pulsar/consumer_impl.go            |   9 +-
 pulsar/consumer_test.go            |   2 -
 pulsar/unacked_msg_tracker.go      | 199 -------------------------------------
 pulsar/unacked_msg_tracker_test.go |  66 ------------
 5 files changed, 1 insertion(+), 284 deletions(-)

diff --git a/pulsar/consumer.go b/pulsar/consumer.go
index 3b729ec..b0bba1b 100644
--- a/pulsar/consumer.go
+++ b/pulsar/consumer.go
@@ -19,7 +19,6 @@ package pulsar
 
 import (
 	"context"
-	"time"
 )
 
 // Pair of a Consumer and Message
@@ -82,11 +81,6 @@ type ConsumerOptions struct {
 	// This properties will be visible in the topic stats
 	Properties map[string]string
 
-	// Set the timeout for unacked messages
-	// Message not acknowledged within the give time, will be replayed by the broker to the same or a different consumer
-	// Default is 0, which means message are not being replayed based on ack time
-	AckTimeout time.Duration
-
 	// Select the subscription type to be used when subscribing to the topic.
 	// Default is `Exclusive`
 	Type SubscriptionType
@@ -128,9 +122,6 @@ type ConsumerOptions struct {
 
 // Consumer is an interface that abstracts behavior of Pulsar's consumer
 type Consumer interface {
-	// Topic get the topic for the consumer
-	Topic() string
-
 	// Subscription get a subscription for the consumer
 	Subscription() string
 
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index af9b8d5..8b73631 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -34,8 +34,6 @@ import (
 var ErrConsumerClosed = errors.New("consumer closed")
 
 type consumer struct {
-	topic string
-
 	options ConsumerOptions
 
 	consumers []*partitionConsumer
@@ -88,7 +86,6 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) {
 func topicSubscribe(client *client, options ConsumerOptions, topic string,
 	messageCh chan ConsumerMessage) (Consumer, error) {
 	consumer := &consumer{
-		topic:     topic,
 		messageCh: messageCh,
 		errorCh:   make(chan error),
 		log:       log.WithField("topic", topic),
@@ -165,10 +162,6 @@ func topicSubscribe(client *client, options ConsumerOptions, topic string,
 	return consumer, nil
 }
 
-func (c *consumer) Topic() string {
-	return c.topic
-}
-
 func (c *consumer) Subscription() string {
 	return c.options.SubscriptionName
 }
@@ -177,7 +170,7 @@ func (c *consumer) Unsubscribe() error {
 	var errMsg string
 	for _, consumer := range c.consumers {
 		if err := consumer.Unsubscribe(); err != nil {
-			errMsg += fmt.Sprintf("topic %s, subscription %s: %s", c.Topic(), c.Subscription(), err)
+			errMsg += fmt.Sprintf("topic %s, subscription %s: %s", consumer.topic, c.Subscription(), err)
 		}
 	}
 	if errMsg != "" {
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index bbc1315..33c2f15 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -149,7 +149,6 @@ func TestBatchMessageReceive(t *testing.T) {
 		SubscriptionName: subName,
 	})
 	assert.Nil(t, err)
-	assert.Equal(t, topicName, consumer.Topic())
 	count := 0
 
 	for i := 0; i < numOfMessages; i++ {
@@ -397,7 +396,6 @@ func TestConsumerReceiveTimeout(t *testing.T) {
 		Topic:            topic,
 		SubscriptionName: "my-sub1",
 		Type:             Shared,
-		AckTimeout:       5 * 1000,
 	})
 	assert.Nil(t, err)
 	defer consumer.Close()
diff --git a/pulsar/unacked_msg_tracker.go b/pulsar/unacked_msg_tracker.go
deleted file mode 100644
index 5dfe895..0000000
--- a/pulsar/unacked_msg_tracker.go
+++ /dev/null
@@ -1,199 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package pulsar
-
-import (
-	"sync"
-	"time"
-
-	"github.com/golang/protobuf/proto"
-
-	set "github.com/deckarep/golang-set"
-	log "github.com/sirupsen/logrus"
-
-	"github.com/apache/pulsar-client-go/pkg/pb"
-)
-
-type UnackedMessageTracker struct {
-	cmu        sync.RWMutex // protects following
-	currentSet set.Set
-	oldOpenSet set.Set
-	timeout    *time.Ticker
-
-	pcs []*partitionConsumer
-}
-
-// NewUnackedMessageTracker init UnackedMessageTracker object
-func NewUnackedMessageTracker() *UnackedMessageTracker {
-	unAckTracker := &UnackedMessageTracker{
-		currentSet: set.NewSet(),
-		oldOpenSet: set.NewSet(),
-	}
-
-	return unAckTracker
-}
-
-// Size return the size of current set and old open set cardinality
-func (t *UnackedMessageTracker) Size() int {
-	t.cmu.Lock()
-	defer t.cmu.Unlock()
-
-	return t.currentSet.Cardinality() + t.oldOpenSet.Cardinality()
-}
-
-// IsEmpty check if the currentSet or oldOpenSet are empty.
-func (t *UnackedMessageTracker) IsEmpty() bool {
-	t.cmu.RLock()
-	defer t.cmu.RUnlock()
-
-	return t.currentSet.Cardinality() == 0 && t.oldOpenSet.Cardinality() == 0
-}
-
-// Add will add message id data to currentSet and remove the message id from oldOpenSet.
-func (t *UnackedMessageTracker) Add(id *pb.MessageIdData) bool {
-	t.cmu.Lock()
-	defer t.cmu.Unlock()
-
-	t.oldOpenSet.Remove(id)
-	return t.currentSet.Add(id)
-}
-
-// Remove will remove message id data from currentSet and oldOpenSet
-func (t *UnackedMessageTracker) Remove(id *pb.MessageIdData) {
-	t.cmu.Lock()
-	defer t.cmu.Unlock()
-
-	t.currentSet.Remove(id)
-	t.oldOpenSet.Remove(id)
-}
-
-func (t *UnackedMessageTracker) clear() {
-	t.cmu.Lock()
-	defer t.cmu.Unlock()
-
-	t.currentSet.Clear()
-	t.oldOpenSet.Clear()
-}
-
-func (t *UnackedMessageTracker) toggle() {
-	t.cmu.Lock()
-	defer t.cmu.Unlock()
-
-	t.currentSet, t.oldOpenSet = t.oldOpenSet, t.currentSet
-}
-
-func (t *UnackedMessageTracker) isAckTimeout() bool {
-	t.cmu.RLock()
-	defer t.cmu.RUnlock()
-
-	return !(t.oldOpenSet.Cardinality() == 0)
-}
-
-func (t *UnackedMessageTracker) lessThanOrEqual(id1, id2 pb.MessageIdData) bool {
-	return id1.GetPartition() == id2.GetPartition() &&
-		(id1.GetLedgerId() < id2.GetLedgerId() || id1.GetEntryId() <= id2.GetEntryId())
-}
-
-func (t *UnackedMessageTracker) RemoveMessagesTill(id pb.MessageIdData) int {
-	t.cmu.Lock()
-	defer t.cmu.Unlock()
-
-	counter := 0
-
-	t.currentSet.Each(func(elem interface{}) bool {
-		if t.lessThanOrEqual(elem.(pb.MessageIdData), id) {
-			t.currentSet.Remove(elem)
-			counter++
-		}
-		return true
-	})
-
-	t.oldOpenSet.Each(func(elem interface{}) bool {
-		if t.lessThanOrEqual(elem.(pb.MessageIdData), id) {
-			t.currentSet.Remove(elem)
-			counter++
-		}
-		return true
-	})
-
-	return counter
-}
-
-func (t *UnackedMessageTracker) Start(ackTimeoutMillis int64) {
-	t.cmu.Lock()
-	defer t.cmu.Unlock()
-	t.timeout = time.NewTicker((time.Duration(ackTimeoutMillis)) * time.Millisecond)
-
-	go t.handlerCmd()
-}
-
-func (t *UnackedMessageTracker) handlerCmd() {
-	for {
-		select {
-		case tick := <-t.timeout.C:
-			if t.isAckTimeout() {
-				t.cmu.Lock()
-				log.Debugf(" %d messages have timed-out", t.oldOpenSet.Cardinality())
-				messageIds := make([]*pb.MessageIdData, 0)
-
-				t.oldOpenSet.Each(func(i interface{}) bool {
-					messageIds = append(messageIds, i.(*pb.MessageIdData))
-					return false
-				})
-				log.Debugf("messageID length is:%d", len(messageIds))
-
-				t.oldOpenSet.Clear()
-				t.cmu.Unlock()
-
-				if t.pcs != nil {
-					messageIdsMap := make(map[int32][]*pb.MessageIdData)
-					for _, msgID := range messageIds {
-						messageIdsMap[msgID.GetPartition()] = append(messageIdsMap[msgID.GetPartition()], msgID)
-					}
-
-					for index, subConsumer := range t.pcs {
-						if messageIdsMap[int32(index)] != nil {
-							requestID := subConsumer.client.rpcClient.NewRequestID()
-							cmd := &pb.CommandRedeliverUnacknowledgedMessages{
-								ConsumerId: proto.Uint64(subConsumer.consumerID),
-								MessageIds: messageIdsMap[int32(index)],
-							}
-
-							_, err := subConsumer.client.rpcClient.RequestOnCnx(subConsumer.conn, requestID,
-								pb.BaseCommand_REDELIVER_UNACKNOWLEDGED_MESSAGES, cmd)
-							if err != nil {
-								subConsumer.log.WithError(err).Error("Failed to unsubscribe consumer")
-								return
-							}
-						}
-					}
-				}
-			}
-			log.Debugf("Tick at: %v", tick)
-		}
-
-		t.toggle()
-	}
-}
-
-func (t *UnackedMessageTracker) Stop() {
-	t.timeout.Stop()
-	log.Debug("stop ticker ", t.timeout)
-
-	t.clear()
-}
diff --git a/pulsar/unacked_msg_tracker_test.go b/pulsar/unacked_msg_tracker_test.go
deleted file mode 100644
index 3848ce9..0000000
--- a/pulsar/unacked_msg_tracker_test.go
+++ /dev/null
@@ -1,66 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package pulsar
-
-import (
-	"testing"
-
-	"github.com/golang/protobuf/proto"
-	"github.com/stretchr/testify/assert"
-
-	"github.com/apache/pulsar-client-go/pkg/pb"
-)
-
-func TestUnackedMessageTracker(t *testing.T) {
-	unAckTracker := NewUnackedMessageTracker()
-
-	var msgIDs []*pb.MessageIdData
-
-	for i := 0; i < 5; i++ {
-		msgID := &pb.MessageIdData{
-			LedgerId:   proto.Uint64(1),
-			EntryId:    proto.Uint64(uint64(i)),
-			Partition:  proto.Int32(-1),
-			BatchIndex: proto.Int32(-1),
-		}
-
-		msgIDs = append(msgIDs, msgID)
-	}
-
-	for _, msgID := range msgIDs {
-		ok := unAckTracker.Add(msgID)
-		assert.True(t, ok)
-	}
-
-	flag := unAckTracker.IsEmpty()
-	assert.False(t, flag)
-
-	num := unAckTracker.Size()
-	assert.Equal(t, num, 5)
-
-	for index, msgID := range msgIDs {
-		unAckTracker.Remove(msgID)
-		assert.Equal(t, 4-index, unAckTracker.Size())
-	}
-
-	num = unAckTracker.Size()
-	assert.Equal(t, num, 0)
-
-	flag = unAckTracker.IsEmpty()
-	assert.True(t, flag)
-}