You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ad...@apache.org on 2021/05/22 21:30:38 UTC
[pulsar-client-go] branch fix_reader_latest created (now f2b313b)
This is an automated email from the ASF dual-hosted git repository.
addisonj pushed a change to branch fix_reader_latest
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git.
at f2b313b Fix reader latest position
This branch includes the following new commits:
new f2b313b Fix reader latest position
The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
[pulsar-client-go] 01/01: Fix reader latest position
Posted by ad...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
addisonj pushed a commit to branch fix_reader_latest
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
commit f2b313b18d3068d29f6166572e6050a51a1c5bd5
Author: Addison Higham <ad...@gmail.com>
AuthorDate: Sat May 22 15:26:10 2021 -0600
Fix reader latest position
Currently, using reader.latest fails because we discard
all messages less than the startPositionID, this causes an issue with
the reader as we filter all messages as all messages are less than
latest message id
---
pulsar/consumer_partition.go | 4 ++++
pulsar/impl_message.go | 15 +++++++++++++++
pulsar/message.go | 5 ++---
3 files changed, 21 insertions(+), 3 deletions(-)
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 031e0a3..5be5dff 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -575,6 +575,10 @@ func (pc *partitionConsumer) messageShouldBeDiscarded(msgID trackingMessageID) b
if pc.startMessageID.Undefined() {
return false
}
+ // if we start at latest message, we should never discard
+ if pc.options.startMessageID.equal(latestMessageId) {
+ return false
+ }
if pc.options.startMessageIDInclusive {
return pc.startMessageID.greater(msgID.messageID)
diff --git a/pulsar/impl_message.go b/pulsar/impl_message.go
index c358c22..c25763d 100644
--- a/pulsar/impl_message.go
+++ b/pulsar/impl_message.go
@@ -19,6 +19,7 @@ package pulsar
import (
"fmt"
+ "math"
"math/big"
"strings"
"sync"
@@ -36,6 +37,20 @@ type messageID struct {
partitionIdx int32
}
+var latestMessageId messageID = messageID{
+ ledgerID: math.MaxInt64,
+ entryID: math.MaxInt64,
+ batchIdx: -1,
+ partitionIdx: -1,
+}
+
+var earliestMessageId messageID = messageID{
+ ledgerID: -1,
+ entryID: -1,
+ batchIdx: -1,
+ partitionIdx: -1,
+}
+
type trackingMessageID struct {
messageID
diff --git a/pulsar/message.go b/pulsar/message.go
index 397c51e..f26bec0 100644
--- a/pulsar/message.go
+++ b/pulsar/message.go
@@ -18,7 +18,6 @@
package pulsar
import (
- "math"
"time"
)
@@ -129,10 +128,10 @@ func DeserializeMessageID(data []byte) (MessageID, error) {
// EarliestMessageID returns a messageID that points to the earliest message available in a topic
func EarliestMessageID() MessageID {
- return newMessageID(-1, -1, -1, -1)
+ return earliestMessageId
}
// LatestMessage returns a messageID that points to the latest message
func LatestMessageID() MessageID {
- return newMessageID(math.MaxInt64, math.MaxInt64, -1, -1)
+ return latestMessageId
}