You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ad...@apache.org on 2021/05/22 21:30:38 UTC

[pulsar-client-go] branch fix_reader_latest created (now f2b313b)

This is an automated email from the ASF dual-hosted git repository.

addisonj pushed a change to branch fix_reader_latest
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git.


      at f2b313b  Fix reader latest position

This branch includes the following new commits:

     new f2b313b  Fix reader latest position

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[pulsar-client-go] 01/01: Fix reader latest position

Posted by ad...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

addisonj pushed a commit to branch fix_reader_latest
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git

commit f2b313b18d3068d29f6166572e6050a51a1c5bd5
Author: Addison Higham <ad...@gmail.com>
AuthorDate: Sat May 22 15:26:10 2021 -0600

    Fix reader latest position
    
    Currently, using reader.latest fails because we discard
    all messages less than the startPositionID, this causes an issue with
    the reader as we filter all messages as all messages are less than
    latest message id
---
 pulsar/consumer_partition.go |  4 ++++
 pulsar/impl_message.go       | 15 +++++++++++++++
 pulsar/message.go            |  5 ++---
 3 files changed, 21 insertions(+), 3 deletions(-)

diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 031e0a3..5be5dff 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -575,6 +575,10 @@ func (pc *partitionConsumer) messageShouldBeDiscarded(msgID trackingMessageID) b
 	if pc.startMessageID.Undefined() {
 		return false
 	}
+	// if we start at latest message, we should never discard
+	if pc.options.startMessageID.equal(latestMessageId) {
+		return false
+	}
 
 	if pc.options.startMessageIDInclusive {
 		return pc.startMessageID.greater(msgID.messageID)
diff --git a/pulsar/impl_message.go b/pulsar/impl_message.go
index c358c22..c25763d 100644
--- a/pulsar/impl_message.go
+++ b/pulsar/impl_message.go
@@ -19,6 +19,7 @@ package pulsar
 
 import (
 	"fmt"
+	"math"
 	"math/big"
 	"strings"
 	"sync"
@@ -36,6 +37,20 @@ type messageID struct {
 	partitionIdx int32
 }
 
+var latestMessageId messageID = messageID{
+	ledgerID:     math.MaxInt64,
+	entryID:      math.MaxInt64,
+	batchIdx:     -1,
+	partitionIdx: -1,
+}
+
+var earliestMessageId messageID = messageID{
+	ledgerID:     -1,
+	entryID:      -1,
+	batchIdx:     -1,
+	partitionIdx: -1,
+}
+
 type trackingMessageID struct {
 	messageID
 
diff --git a/pulsar/message.go b/pulsar/message.go
index 397c51e..f26bec0 100644
--- a/pulsar/message.go
+++ b/pulsar/message.go
@@ -18,7 +18,6 @@
 package pulsar
 
 import (
-	"math"
 	"time"
 )
 
@@ -129,10 +128,10 @@ func DeserializeMessageID(data []byte) (MessageID, error) {
 
 // EarliestMessageID returns a messageID that points to the earliest message available in a topic
 func EarliestMessageID() MessageID {
-	return newMessageID(-1, -1, -1, -1)
+	return earliestMessageId
 }
 
 // LatestMessage returns a messageID that points to the latest message
 func LatestMessageID() MessageID {
-	return newMessageID(math.MaxInt64, math.MaxInt64, -1, -1)
+	return latestMessageId
 }