You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/06/23 21:24:54 UTC
[pulsar-client-go] branch master updated: [Issue 781][add consumer seek by time on partitioned topic] (#782)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 0f7041f [Issue 781][add consumer seek by time on partitioned topic] (#782)
0f7041f is described below
commit 0f7041ffa9085197aa888ac33d3288a3ed81c57b
Author: Garule Prabhudas <pr...@gmail.com>
AuthorDate: Fri Jun 24 02:54:49 2022 +0530
[Issue 781][add consumer seek by time on partitioned topic] (#782)
* seek and every partition of topic and check for error
* use array to store errors instead of channles
* add test case to test seek by time on partitioned topic
* wrap errors
* refactor method
Co-authored-by: PGarule <PG...@fanatics.com>
---
pulsar/consumer_impl.go | 13 ++++++---
pulsar/consumer_test.go | 73 +++++++++++++++++++++++++++++++++++++++++++++++++
2 files changed, 82 insertions(+), 4 deletions(-)
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index e887538..2328ca8 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -30,6 +30,7 @@ import (
"github.com/apache/pulsar-client-go/pulsar/internal"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
"github.com/apache/pulsar-client-go/pulsar/log"
+ pkgerrors "github.com/pkg/errors"
)
const defaultNackRedeliveryDelay = 1 * time.Minute
@@ -589,11 +590,15 @@ func (c *consumer) Seek(msgID MessageID) error {
func (c *consumer) SeekByTime(time time.Time) error {
c.Lock()
defer c.Unlock()
- if len(c.consumers) > 1 {
- return newError(SeekFailed, "for partition topic, seek command should perform on the individual partitions")
+ var errs error
+ // run SeekByTime on every partition of topic
+ for _, cons := range c.consumers {
+ if err := cons.SeekByTime(time); err != nil {
+ msg := fmt.Sprintf("unable to SeekByTime for topic=%s subscription=%s", c.topic, c.Subscription())
+ errs = pkgerrors.Wrap(newError(SeekFailed, err.Error()), msg)
+ }
}
-
- return c.consumers[0].SeekByTime(time)
+ return errs
}
var r = &random{
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 0366884..20d4290 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -3052,3 +3052,76 @@ func TestEncryptDecryptRedeliveryOnFailure(t *testing.T) {
assert.NotNil(t, msg)
consumer.Ack(msg)
}
+
+// TestConsumerSeekByTimeOnPartitionedTopic test seek by time on partitioned topic.
+// It is based on existing test case [TestConsumerSeekByTime] but for partitioned topic.
+func TestConsumerSeekByTimeOnPartitionedTopic(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+ assert.Nil(t, err)
+ defer client.Close()
+
+ // Create topic with 5 partitions
+ topicAdminURL := "admin/v2/persistent/public/default/TestSeekByTimeOnPartitionedTopic/partitions"
+ err = httpPut(topicAdminURL, 5)
+ defer httpDelete(topicAdminURL)
+ assert.Nil(t, err)
+
+ topicName := "persistent://public/default/TestSeekByTimeOnPartitionedTopic"
+
+ partitions, err := client.TopicPartitions(topicName)
+ assert.Nil(t, err)
+ assert.Equal(t, len(partitions), 5)
+ for i := 0; i < 5; i++ {
+ assert.Equal(t, partitions[i],
+ fmt.Sprintf("%s-partition-%d", topicName, i))
+ }
+
+ ctx := context.Background()
+
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topicName,
+ DisableBatching: false,
+ })
+ assert.Nil(t, err)
+ defer producer.Close()
+
+ consumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topicName,
+ SubscriptionName: "my-sub",
+ })
+ assert.Nil(t, err)
+ defer consumer.Close()
+
+ // Use value bigger than 1000 to full-fill queue channel with size 1000 and message channel with size 10
+ const N = 1100
+ resetTimeStr := "100s"
+ retentionTimeInSecond, err := internal.ParseRelativeTimeInSeconds(resetTimeStr)
+ assert.Nil(t, err)
+
+ for i := 0; i < N; i++ {
+ _, err := producer.Send(ctx, &ProducerMessage{
+ Payload: []byte(fmt.Sprintf("hello-%d", i)),
+ })
+ assert.Nil(t, err)
+ }
+
+ // Don't consume all messages so some stay in queues
+ for i := 0; i < N-20; i++ {
+ msg, err := consumer.Receive(ctx)
+ assert.Nil(t, err)
+ consumer.Ack(msg)
+ }
+
+ currentTimestamp := time.Now()
+ err = consumer.SeekByTime(currentTimestamp.Add(-retentionTimeInSecond))
+ assert.Nil(t, err)
+
+ // should be able to consume all messages once again
+ for i := 0; i < N; i++ {
+ msg, err := consumer.Receive(ctx)
+ assert.Nil(t, err)
+ consumer.Ack(msg)
+ }
+}