You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ad...@apache.org on 2021/05/25 23:21:16 UTC

[pulsar-client-go] branch master updated: [bug] Fix reader latest position (#525)

This is an automated email from the ASF dual-hosted git repository.

addisonj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 07006de  [bug] Fix reader latest position (#525)
07006de is described below

commit 07006dedd6e6e191f1a6e77d9561688bb4b281c0
Author: Addison Higham <ad...@gmail.com>
AuthorDate: Tue May 25 17:21:09 2021 -0600

    [bug] Fix reader latest position (#525)
    
    * Fix reader latest position
    
    Currently, using reader.latest fails because we discard
    all messages less than the startPositionID, this causes an issue with
    the reader as we filter all messages as all messages are less than
    latest message id
    
    * fix lint
    
    * Fix Lint
---
 pulsar/consumer_partition.go |  4 ++++
 pulsar/impl_message.go       | 15 +++++++++++++++
 pulsar/message.go            |  5 ++---
 3 files changed, 21 insertions(+), 3 deletions(-)

diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 031e0a3..94a914d 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -575,6 +575,10 @@ func (pc *partitionConsumer) messageShouldBeDiscarded(msgID trackingMessageID) b
 	if pc.startMessageID.Undefined() {
 		return false
 	}
+	// if we start at latest message, we should never discard
+	if pc.options.startMessageID.equal(latestMessageID) {
+		return false
+	}
 
 	if pc.options.startMessageIDInclusive {
 		return pc.startMessageID.greater(msgID.messageID)
diff --git a/pulsar/impl_message.go b/pulsar/impl_message.go
index c358c22..a84fc2d 100644
--- a/pulsar/impl_message.go
+++ b/pulsar/impl_message.go
@@ -19,6 +19,7 @@ package pulsar
 
 import (
 	"fmt"
+	"math"
 	"math/big"
 	"strings"
 	"sync"
@@ -36,6 +37,20 @@ type messageID struct {
 	partitionIdx int32
 }
 
+var latestMessageID = messageID{
+	ledgerID:     math.MaxInt64,
+	entryID:      math.MaxInt64,
+	batchIdx:     -1,
+	partitionIdx: -1,
+}
+
+var earliestMessageID = messageID{
+	ledgerID:     -1,
+	entryID:      -1,
+	batchIdx:     -1,
+	partitionIdx: -1,
+}
+
 type trackingMessageID struct {
 	messageID
 
diff --git a/pulsar/message.go b/pulsar/message.go
index 397c51e..55ecbcc 100644
--- a/pulsar/message.go
+++ b/pulsar/message.go
@@ -18,7 +18,6 @@
 package pulsar
 
 import (
-	"math"
 	"time"
 )
 
@@ -129,10 +128,10 @@ func DeserializeMessageID(data []byte) (MessageID, error) {
 
 // EarliestMessageID returns a messageID that points to the earliest message available in a topic
 func EarliestMessageID() MessageID {
-	return newMessageID(-1, -1, -1, -1)
+	return earliestMessageID
 }
 
 // LatestMessage returns a messageID that points to the latest message
 func LatestMessageID() MessageID {
-	return newMessageID(math.MaxInt64, math.MaxInt64, -1, -1)
+	return latestMessageID
 }