You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2021/01/15 01:21:34 UTC
[pulsar-client-go] branch master updated: [issue 438]
reader.HasNext() returns true on empty topic (#441)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 2456729 [issue 438] reader.HasNext() returns true on empty topic (#441)
2456729 is described below
commit 2456729a54cad11865005d177698a617db864c72
Author: Ming <it...@gmail.com>
AuthorDate: Thu Jan 14 20:21:26 2021 -0500
[issue 438] reader.HasNext() returns true on empty topic (#441)
---
pulsar/impl_message.go | 4 ++++
pulsar/reader_impl.go | 6 +++---
pulsar/reader_test.go | 20 ++++++++++++++++++++
3 files changed, 27 insertions(+), 3 deletions(-)
diff --git a/pulsar/impl_message.go b/pulsar/impl_message.go
index 6fc9cad..c358c22 100644
--- a/pulsar/impl_message.go
+++ b/pulsar/impl_message.go
@@ -71,6 +71,10 @@ func (id trackingMessageID) ack() bool {
return true
}
+func (id messageID) isEntryIDValid() bool {
+ return id.entryID >= 0
+}
+
func (id messageID) greater(other messageID) bool {
if id.ledgerID != other.ledgerID {
return id.ledgerID > other.ledgerID
diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go
index 8caaff4..d76865e 100644
--- a/pulsar/reader_impl.go
+++ b/pulsar/reader_impl.go
@@ -161,15 +161,15 @@ func (r *reader) HasNext() bool {
func (r *reader) hasMoreMessages() bool {
if !r.pc.lastDequeuedMsg.Undefined() {
- return r.lastMessageInBroker.greater(r.pc.lastDequeuedMsg.messageID)
+ return r.lastMessageInBroker.isEntryIDValid() && r.lastMessageInBroker.greater(r.pc.lastDequeuedMsg.messageID)
}
if r.pc.options.startMessageIDInclusive {
- return r.lastMessageInBroker.greaterEqual(r.pc.startMessageID.messageID)
+ return r.lastMessageInBroker.isEntryIDValid() && r.lastMessageInBroker.greaterEqual(r.pc.startMessageID.messageID)
}
// Non-inclusive
- return r.lastMessageInBroker.greater(r.pc.startMessageID.messageID)
+ return r.lastMessageInBroker.isEntryIDValid() && r.lastMessageInBroker.greater(r.pc.startMessageID.messageID)
}
func (r *reader) Close() {
diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go
index 793dc8d..36204cf 100644
--- a/pulsar/reader_test.go
+++ b/pulsar/reader_test.go
@@ -312,6 +312,26 @@ func TestReaderOnLatestWithBatching(t *testing.T) {
cancel()
}
+func TestReaderHasNextAgainstEmptyTopic(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+
+ assert.Nil(t, err)
+ defer client.Close()
+
+ // create reader on 5th message (not included)
+ reader, err := client.CreateReader(ReaderOptions{
+ Topic: "an-empty-topic",
+ StartMessageID: EarliestMessageID(),
+ })
+
+ assert.Nil(t, err)
+ defer reader.Close()
+
+ assert.Equal(t, reader.HasNext(), false)
+}
+
func TestReaderHasNext(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,