You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2021/01/15 01:21:34 UTC

[pulsar-client-go] branch master updated: [issue 438] reader.HasNext() returns true on empty topic (#441)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 2456729  [issue 438] reader.HasNext() returns true on empty topic (#441)
2456729 is described below

commit 2456729a54cad11865005d177698a617db864c72
Author: Ming <it...@gmail.com>
AuthorDate: Thu Jan 14 20:21:26 2021 -0500

    [issue 438] reader.HasNext() returns true on empty topic (#441)
---
 pulsar/impl_message.go |  4 ++++
 pulsar/reader_impl.go  |  6 +++---
 pulsar/reader_test.go  | 20 ++++++++++++++++++++
 3 files changed, 27 insertions(+), 3 deletions(-)

diff --git a/pulsar/impl_message.go b/pulsar/impl_message.go
index 6fc9cad..c358c22 100644
--- a/pulsar/impl_message.go
+++ b/pulsar/impl_message.go
@@ -71,6 +71,10 @@ func (id trackingMessageID) ack() bool {
 	return true
 }
 
+func (id messageID) isEntryIDValid() bool {
+	return id.entryID >= 0
+}
+
 func (id messageID) greater(other messageID) bool {
 	if id.ledgerID != other.ledgerID {
 		return id.ledgerID > other.ledgerID
diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go
index 8caaff4..d76865e 100644
--- a/pulsar/reader_impl.go
+++ b/pulsar/reader_impl.go
@@ -161,15 +161,15 @@ func (r *reader) HasNext() bool {
 
 func (r *reader) hasMoreMessages() bool {
 	if !r.pc.lastDequeuedMsg.Undefined() {
-		return r.lastMessageInBroker.greater(r.pc.lastDequeuedMsg.messageID)
+		return r.lastMessageInBroker.isEntryIDValid() && r.lastMessageInBroker.greater(r.pc.lastDequeuedMsg.messageID)
 	}
 
 	if r.pc.options.startMessageIDInclusive {
-		return r.lastMessageInBroker.greaterEqual(r.pc.startMessageID.messageID)
+		return r.lastMessageInBroker.isEntryIDValid() && r.lastMessageInBroker.greaterEqual(r.pc.startMessageID.messageID)
 	}
 
 	// Non-inclusive
-	return r.lastMessageInBroker.greater(r.pc.startMessageID.messageID)
+	return r.lastMessageInBroker.isEntryIDValid() && r.lastMessageInBroker.greater(r.pc.startMessageID.messageID)
 }
 
 func (r *reader) Close() {
diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go
index 793dc8d..36204cf 100644
--- a/pulsar/reader_test.go
+++ b/pulsar/reader_test.go
@@ -312,6 +312,26 @@ func TestReaderOnLatestWithBatching(t *testing.T) {
 	cancel()
 }
 
+func TestReaderHasNextAgainstEmptyTopic(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+
+	assert.Nil(t, err)
+	defer client.Close()
+
+	// create reader on 5th message (not included)
+	reader, err := client.CreateReader(ReaderOptions{
+		Topic:          "an-empty-topic",
+		StartMessageID: EarliestMessageID(),
+	})
+
+	assert.Nil(t, err)
+	defer reader.Close()
+
+	assert.Equal(t, reader.HasNext(), false)
+}
+
 func TestReaderHasNext(t *testing.T) {
 	client, err := NewClient(ClientOptions{
 		URL: lookupURL,