You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/08/24 06:05:25 UTC

[pulsar-client-go] branch master updated: Add seek logic for reader (#356)

This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new f9b3c0f  Add seek logic for reader (#356)
f9b3c0f is described below

commit f9b3c0f078dd4e3bf294fe39b03523203c25e990
Author: 冉小龙 <rx...@apache.org>
AuthorDate: Mon Aug 24 14:05:18 2020 +0800

    Add seek logic for reader (#356)
    
    Signed-off-by: xiaolong.ran <rx...@apache.org>
    
    ### Motivation
    
    
    Follow https://github.com/apache/pulsar-client-go/pull/222 and add the seek logic for reader
    
    ### Modifications
    
    - Add `seek by msgID` interface
    - Add `seek by time` interface
    - Add test case
    
    ### Verifying this change
    
    - [x] Make sure that the change passes the CI checks.
---
 pulsar/consumer_partition.go  |  8 +++---
 pulsar/internal/connection.go |  2 +-
 pulsar/reader.go              | 22 ++++++++++++++-
 pulsar/reader_impl.go         | 38 ++++++++++++++++++++++++++
 pulsar/reader_test.go         | 63 ++++++++++++++++++++++++++++++++++++++++---
 5 files changed, 123 insertions(+), 10 deletions(-)

diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 04a36cd..4d10521 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -207,7 +207,7 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon
 		if msgID.entryID != noMessageEntry {
 			pc.startMessageID = msgID
 
-			err = pc.requestSeek(msgID)
+			err = pc.requestSeek(msgID.messageID)
 			if err != nil {
 				return nil, err
 			}
@@ -276,7 +276,7 @@ func (pc *partitionConsumer) internalGetLastMessageID(req *getLastMsgIDRequest)
 	req.msgID, req.err = pc.requestGetLastMessageID()
 }
 
-func (pc *partitionConsumer) requestGetLastMessageID() (messageID, error) {
+func (pc *partitionConsumer) requestGetLastMessageID() (trackingMessageID, error) {
 	requestID := pc.client.rpcClient.NewRequestID()
 	cmdGetLastMessageID := &pb.CommandGetLastMessageId{
 		RequestId:  proto.Uint64(requestID),
@@ -286,7 +286,7 @@ func (pc *partitionConsumer) requestGetLastMessageID() (messageID, error) {
 		pb.BaseCommand_GET_LAST_MESSAGE_ID, cmdGetLastMessageID)
 	if err != nil {
 		pc.log.WithError(err).Error("Failed to get last message id")
-		return messageID{}, err
+		return trackingMessageID{}, err
 	}
 	id := res.Response.GetLastMessageIdResponse.GetLastMessageId()
 	return convertToMessageID(id), nil
@@ -365,7 +365,7 @@ func (pc *partitionConsumer) Seek(msgID trackingMessageID) error {
 
 func (pc *partitionConsumer) internalSeek(seek *seekRequest) {
 	defer close(seek.doneCh)
-	seek.err = pc.requestSeek(seek.msgID)
+	seek.err = pc.requestSeek(seek.msgID.messageID)
 }
 
 func (pc *partitionConsumer) requestSeek(msgID messageID) error {
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 8de1ad5..1bfec52 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -665,8 +665,8 @@ func (c *connection) handleAuthChallenge(authChallenge *pb.CommandAuthChallenge)
 }
 
 func (c *connection) handleCloseConsumer(closeConsumer *pb.CommandCloseConsumer) {
-	c.log.Infof("Broker notification of Closed consumer: %d", closeConsumer.GetConsumerId())
 	consumerID := closeConsumer.GetConsumerId()
+	c.log.Infof("Broker notification of Closed consumer: %d", consumerID)
 
 	c.Lock()
 	defer c.Unlock()
diff --git a/pulsar/reader.go b/pulsar/reader.go
index 8fe99f8..40234aa 100644
--- a/pulsar/reader.go
+++ b/pulsar/reader.go
@@ -17,7 +17,10 @@
 
 package pulsar
 
-import "context"
+import (
+	"context"
+	"time"
+)
 
 // ReaderMessage package Reader and Message as a struct to use
 type ReaderMessage struct {
@@ -88,4 +91,21 @@ type Reader interface {
 
 	// Close the reader and stop the broker to push more messages
 	Close()
+
+	// Reset the subscription associated with this reader to a specific message id.
+	// The message id can either be a specific message or represent the first or last messages in the topic.
+	//
+	// Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the
+	//       seek() on the individual partitions.
+	Seek(MessageID) error
+
+	// Reset the subscription associated with this reader to a specific message publish time.
+	//
+	// Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the seek() on
+	// the individual partitions.
+	//
+	// @param timestamp
+	//            the message publish time where to reposition the subscription
+	//
+	SeekByTime(time time.Time) error
 }
diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go
index 474d0db..8083b06 100644
--- a/pulsar/reader_impl.go
+++ b/pulsar/reader_impl.go
@@ -20,6 +20,7 @@ package pulsar
 import (
 	"context"
 	"fmt"
+	"sync"
 	"time"
 
 	"github.com/prometheus/client_golang/prometheus"
@@ -45,6 +46,7 @@ var (
 )
 
 type reader struct {
+	sync.Mutex
 	pc                  *partitionConsumer
 	messageCh           chan ConsumerMessage
 	lastMessageInBroker trackingMessageID
@@ -187,3 +189,39 @@ func (r *reader) Close() {
 	r.pc.Close()
 	readersClosed.Inc()
 }
+
+func (r *reader) messageID(msgID MessageID) (trackingMessageID, bool) {
+	mid, ok := toTrackingMessageID(msgID)
+	if !ok {
+		r.log.Warnf("invalid message id type %T", msgID)
+		return trackingMessageID{}, false
+	}
+
+	partition := int(mid.partitionIdx)
+	// did we receive a valid partition index?
+	if partition < 0 {
+		r.log.Warnf("invalid partition index %d expected", partition)
+		return trackingMessageID{}, false
+	}
+
+	return mid, true
+}
+
+func (r *reader) Seek(msgID MessageID) error {
+	r.Lock()
+	defer r.Unlock()
+
+	mid, ok := r.messageID(msgID)
+	if !ok {
+		return nil
+	}
+
+	return r.pc.Seek(mid)
+}
+
+func (r *reader) SeekByTime(time time.Time) error {
+	r.Lock()
+	defer r.Unlock()
+
+	return r.pc.SeekByTime(time)
+}
diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go
index 08b949e..793dc8d 100644
--- a/pulsar/reader_test.go
+++ b/pulsar/reader_test.go
@@ -447,6 +447,65 @@ func TestReaderOnSpecificMessageWithCustomMessageID(t *testing.T) {
 	}
 }
 
+func TestReaderSeek(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topicName := newTopicName()
+	ctx := context.Background()
+
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic: topicName,
+	})
+	assert.Nil(t, err)
+	defer producer.Close()
+
+	reader, err := client.CreateReader(ReaderOptions{
+		Topic:          topicName,
+		StartMessageID: EarliestMessageID(),
+	})
+	assert.Nil(t, err)
+	defer reader.Close()
+
+	const N = 10
+	var seekID MessageID
+	for i := 0; i < N; i++ {
+		id, err := producer.Send(ctx, &ProducerMessage{
+			Payload: []byte(fmt.Sprintf("hello-%d", i)),
+		})
+		assert.Nil(t, err)
+
+		if i == 4 {
+			seekID = id
+		}
+	}
+	err = producer.Flush()
+	assert.NoError(t, err)
+
+	for i := 0; i < N; i++ {
+		msg, err := reader.Next(ctx)
+		assert.Nil(t, err)
+		assert.Equal(t, fmt.Sprintf("hello-%d", i), string(msg.Payload()))
+	}
+
+	err = reader.Seek(seekID)
+	assert.Nil(t, err)
+
+	readerOfSeek, err := client.CreateReader(ReaderOptions{
+		Topic:                   topicName,
+		StartMessageID:          seekID,
+		StartMessageIDInclusive: true,
+	})
+	assert.Nil(t, err)
+
+	msg, err := readerOfSeek.Next(ctx)
+	assert.Nil(t, err)
+	assert.Equal(t, "hello-4", string(msg.Payload()))
+}
+
 func TestReaderLatestInclusiveHasNext(t *testing.T) {
 	client, err := NewClient(ClientOptions{
 		URL: lookupURL,
@@ -498,14 +557,10 @@ func TestReaderLatestInclusiveHasNext(t *testing.T) {
 	assert.Nil(t, err)
 	defer reader.Close()
 
-	var msgID MessageID
 	if reader.HasNext() {
 		msg, err := reader.Next(context.Background())
 		assert.NoError(t, err)
 
 		assert.Equal(t, []byte("hello-9"), msg.Payload())
-		msgID = msg.ID()
 	}
-
-	assert.Equal(t, lastMsgID.Serialize(), msgID.Serialize())
 }