You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/08/24 06:05:25 UTC
[pulsar-client-go] branch master updated: Add seek logic for reader
(#356)
This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new f9b3c0f Add seek logic for reader (#356)
f9b3c0f is described below
commit f9b3c0f078dd4e3bf294fe39b03523203c25e990
Author: 冉小龙 <rx...@apache.org>
AuthorDate: Mon Aug 24 14:05:18 2020 +0800
Add seek logic for reader (#356)
Signed-off-by: xiaolong.ran <rx...@apache.org>
### Motivation
Follow https://github.com/apache/pulsar-client-go/pull/222 and add the seek logic for reader
### Modifications
- Add `seek by msgID` interface
- Add `seek by time` interface
- Add test case
### Verifying this change
- [x] Make sure that the change passes the CI checks.
---
pulsar/consumer_partition.go | 8 +++---
pulsar/internal/connection.go | 2 +-
pulsar/reader.go | 22 ++++++++++++++-
pulsar/reader_impl.go | 38 ++++++++++++++++++++++++++
pulsar/reader_test.go | 63 ++++++++++++++++++++++++++++++++++++++++---
5 files changed, 123 insertions(+), 10 deletions(-)
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 04a36cd..4d10521 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -207,7 +207,7 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon
if msgID.entryID != noMessageEntry {
pc.startMessageID = msgID
- err = pc.requestSeek(msgID)
+ err = pc.requestSeek(msgID.messageID)
if err != nil {
return nil, err
}
@@ -276,7 +276,7 @@ func (pc *partitionConsumer) internalGetLastMessageID(req *getLastMsgIDRequest)
req.msgID, req.err = pc.requestGetLastMessageID()
}
-func (pc *partitionConsumer) requestGetLastMessageID() (messageID, error) {
+func (pc *partitionConsumer) requestGetLastMessageID() (trackingMessageID, error) {
requestID := pc.client.rpcClient.NewRequestID()
cmdGetLastMessageID := &pb.CommandGetLastMessageId{
RequestId: proto.Uint64(requestID),
@@ -286,7 +286,7 @@ func (pc *partitionConsumer) requestGetLastMessageID() (messageID, error) {
pb.BaseCommand_GET_LAST_MESSAGE_ID, cmdGetLastMessageID)
if err != nil {
pc.log.WithError(err).Error("Failed to get last message id")
- return messageID{}, err
+ return trackingMessageID{}, err
}
id := res.Response.GetLastMessageIdResponse.GetLastMessageId()
return convertToMessageID(id), nil
@@ -365,7 +365,7 @@ func (pc *partitionConsumer) Seek(msgID trackingMessageID) error {
func (pc *partitionConsumer) internalSeek(seek *seekRequest) {
defer close(seek.doneCh)
- seek.err = pc.requestSeek(seek.msgID)
+ seek.err = pc.requestSeek(seek.msgID.messageID)
}
func (pc *partitionConsumer) requestSeek(msgID messageID) error {
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 8de1ad5..1bfec52 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -665,8 +665,8 @@ func (c *connection) handleAuthChallenge(authChallenge *pb.CommandAuthChallenge)
}
func (c *connection) handleCloseConsumer(closeConsumer *pb.CommandCloseConsumer) {
- c.log.Infof("Broker notification of Closed consumer: %d", closeConsumer.GetConsumerId())
consumerID := closeConsumer.GetConsumerId()
+ c.log.Infof("Broker notification of Closed consumer: %d", consumerID)
c.Lock()
defer c.Unlock()
diff --git a/pulsar/reader.go b/pulsar/reader.go
index 8fe99f8..40234aa 100644
--- a/pulsar/reader.go
+++ b/pulsar/reader.go
@@ -17,7 +17,10 @@
package pulsar
-import "context"
+import (
+ "context"
+ "time"
+)
// ReaderMessage package Reader and Message as a struct to use
type ReaderMessage struct {
@@ -88,4 +91,21 @@ type Reader interface {
// Close the reader and stop the broker to push more messages
Close()
+
+ // Reset the subscription associated with this reader to a specific message id.
+ // The message id can either be a specific message or represent the first or last messages in the topic.
+ //
+ // Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the
+ // seek() on the individual partitions.
+ Seek(MessageID) error
+
+ // Reset the subscription associated with this reader to a specific message publish time.
+ //
+ // Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the seek() on
+ // the individual partitions.
+ //
+ // @param timestamp
+ // the message publish time where to reposition the subscription
+ //
+ SeekByTime(time time.Time) error
}
diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go
index 474d0db..8083b06 100644
--- a/pulsar/reader_impl.go
+++ b/pulsar/reader_impl.go
@@ -20,6 +20,7 @@ package pulsar
import (
"context"
"fmt"
+ "sync"
"time"
"github.com/prometheus/client_golang/prometheus"
@@ -45,6 +46,7 @@ var (
)
type reader struct {
+ sync.Mutex
pc *partitionConsumer
messageCh chan ConsumerMessage
lastMessageInBroker trackingMessageID
@@ -187,3 +189,39 @@ func (r *reader) Close() {
r.pc.Close()
readersClosed.Inc()
}
+
+func (r *reader) messageID(msgID MessageID) (trackingMessageID, bool) {
+ mid, ok := toTrackingMessageID(msgID)
+ if !ok {
+ r.log.Warnf("invalid message id type %T", msgID)
+ return trackingMessageID{}, false
+ }
+
+ partition := int(mid.partitionIdx)
+ // did we receive a valid partition index?
+ if partition < 0 {
+ r.log.Warnf("invalid partition index %d expected", partition)
+ return trackingMessageID{}, false
+ }
+
+ return mid, true
+}
+
+func (r *reader) Seek(msgID MessageID) error {
+ r.Lock()
+ defer r.Unlock()
+
+ mid, ok := r.messageID(msgID)
+ if !ok {
+ return nil
+ }
+
+ return r.pc.Seek(mid)
+}
+
+func (r *reader) SeekByTime(time time.Time) error {
+ r.Lock()
+ defer r.Unlock()
+
+ return r.pc.SeekByTime(time)
+}
diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go
index 08b949e..793dc8d 100644
--- a/pulsar/reader_test.go
+++ b/pulsar/reader_test.go
@@ -447,6 +447,65 @@ func TestReaderOnSpecificMessageWithCustomMessageID(t *testing.T) {
}
}
+func TestReaderSeek(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+ assert.Nil(t, err)
+ defer client.Close()
+
+ topicName := newTopicName()
+ ctx := context.Background()
+
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topicName,
+ })
+ assert.Nil(t, err)
+ defer producer.Close()
+
+ reader, err := client.CreateReader(ReaderOptions{
+ Topic: topicName,
+ StartMessageID: EarliestMessageID(),
+ })
+ assert.Nil(t, err)
+ defer reader.Close()
+
+ const N = 10
+ var seekID MessageID
+ for i := 0; i < N; i++ {
+ id, err := producer.Send(ctx, &ProducerMessage{
+ Payload: []byte(fmt.Sprintf("hello-%d", i)),
+ })
+ assert.Nil(t, err)
+
+ if i == 4 {
+ seekID = id
+ }
+ }
+ err = producer.Flush()
+ assert.NoError(t, err)
+
+ for i := 0; i < N; i++ {
+ msg, err := reader.Next(ctx)
+ assert.Nil(t, err)
+ assert.Equal(t, fmt.Sprintf("hello-%d", i), string(msg.Payload()))
+ }
+
+ err = reader.Seek(seekID)
+ assert.Nil(t, err)
+
+ readerOfSeek, err := client.CreateReader(ReaderOptions{
+ Topic: topicName,
+ StartMessageID: seekID,
+ StartMessageIDInclusive: true,
+ })
+ assert.Nil(t, err)
+
+ msg, err := readerOfSeek.Next(ctx)
+ assert.Nil(t, err)
+ assert.Equal(t, "hello-4", string(msg.Payload()))
+}
+
func TestReaderLatestInclusiveHasNext(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
@@ -498,14 +557,10 @@ func TestReaderLatestInclusiveHasNext(t *testing.T) {
assert.Nil(t, err)
defer reader.Close()
- var msgID MessageID
if reader.HasNext() {
msg, err := reader.Next(context.Background())
assert.NoError(t, err)
assert.Equal(t, []byte("hello-9"), msg.Payload())
- msgID = msg.ID()
}
-
- assert.Equal(t, lastMsgID.Serialize(), msgID.Serialize())
}