You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/06/23 21:24:54 UTC

[pulsar-client-go] branch master updated: [Issue 781][add consumer seek by time on partitioned topic] (#782)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 0f7041f  [Issue 781][add consumer seek by time on partitioned topic]  (#782)
0f7041f is described below

commit 0f7041ffa9085197aa888ac33d3288a3ed81c57b
Author: Garule Prabhudas <pr...@gmail.com>
AuthorDate: Fri Jun 24 02:54:49 2022 +0530

    [Issue 781][add consumer seek by time on partitioned topic]  (#782)
    
    * seek and every partition of topic and check for error
    
    * use array to store errors instead of channles
    
    * add test case to test seek by time on partitioned topic
    
    * wrap errors
    
    * refactor method
    
    Co-authored-by: PGarule <PG...@fanatics.com>
---
 pulsar/consumer_impl.go | 13 ++++++---
 pulsar/consumer_test.go | 73 +++++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 82 insertions(+), 4 deletions(-)

diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index e887538..2328ca8 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -30,6 +30,7 @@ import (
 	"github.com/apache/pulsar-client-go/pulsar/internal"
 	pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
 	"github.com/apache/pulsar-client-go/pulsar/log"
+	pkgerrors "github.com/pkg/errors"
 )
 
 const defaultNackRedeliveryDelay = 1 * time.Minute
@@ -589,11 +590,15 @@ func (c *consumer) Seek(msgID MessageID) error {
 func (c *consumer) SeekByTime(time time.Time) error {
 	c.Lock()
 	defer c.Unlock()
-	if len(c.consumers) > 1 {
-		return newError(SeekFailed, "for partition topic, seek command should perform on the individual partitions")
+	var errs error
+	// run SeekByTime on every partition of topic
+	for _, cons := range c.consumers {
+		if err := cons.SeekByTime(time); err != nil {
+			msg := fmt.Sprintf("unable to SeekByTime for topic=%s subscription=%s", c.topic, c.Subscription())
+			errs = pkgerrors.Wrap(newError(SeekFailed, err.Error()), msg)
+		}
 	}
-
-	return c.consumers[0].SeekByTime(time)
+	return errs
 }
 
 var r = &random{
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 0366884..20d4290 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -3052,3 +3052,76 @@ func TestEncryptDecryptRedeliveryOnFailure(t *testing.T) {
 	assert.NotNil(t, msg)
 	consumer.Ack(msg)
 }
+
+// TestConsumerSeekByTimeOnPartitionedTopic test seek by time on partitioned topic.
+// It is based on existing test case [TestConsumerSeekByTime] but for partitioned topic.
+func TestConsumerSeekByTimeOnPartitionedTopic(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+	assert.Nil(t, err)
+	defer client.Close()
+
+	// Create topic with 5 partitions
+	topicAdminURL := "admin/v2/persistent/public/default/TestSeekByTimeOnPartitionedTopic/partitions"
+	err = httpPut(topicAdminURL, 5)
+	defer httpDelete(topicAdminURL)
+	assert.Nil(t, err)
+
+	topicName := "persistent://public/default/TestSeekByTimeOnPartitionedTopic"
+
+	partitions, err := client.TopicPartitions(topicName)
+	assert.Nil(t, err)
+	assert.Equal(t, len(partitions), 5)
+	for i := 0; i < 5; i++ {
+		assert.Equal(t, partitions[i],
+			fmt.Sprintf("%s-partition-%d", topicName, i))
+	}
+
+	ctx := context.Background()
+
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic:           topicName,
+		DisableBatching: false,
+	})
+	assert.Nil(t, err)
+	defer producer.Close()
+
+	consumer, err := client.Subscribe(ConsumerOptions{
+		Topic:            topicName,
+		SubscriptionName: "my-sub",
+	})
+	assert.Nil(t, err)
+	defer consumer.Close()
+
+	// Use value bigger than 1000 to full-fill queue channel with size 1000 and message channel with size 10
+	const N = 1100
+	resetTimeStr := "100s"
+	retentionTimeInSecond, err := internal.ParseRelativeTimeInSeconds(resetTimeStr)
+	assert.Nil(t, err)
+
+	for i := 0; i < N; i++ {
+		_, err := producer.Send(ctx, &ProducerMessage{
+			Payload: []byte(fmt.Sprintf("hello-%d", i)),
+		})
+		assert.Nil(t, err)
+	}
+
+	// Don't consume all messages so some stay in queues
+	for i := 0; i < N-20; i++ {
+		msg, err := consumer.Receive(ctx)
+		assert.Nil(t, err)
+		consumer.Ack(msg)
+	}
+
+	currentTimestamp := time.Now()
+	err = consumer.SeekByTime(currentTimestamp.Add(-retentionTimeInSecond))
+	assert.Nil(t, err)
+
+	// should be able to consume all messages once again
+	for i := 0; i < N; i++ {
+		msg, err := consumer.Receive(ctx)
+		assert.Nil(t, err)
+		consumer.Ack(msg)
+	}
+}