You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/03/08 01:09:47 UTC

[pulsar] branch master updated: Fixed Reader.HasNext() in Go client (#3764)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 593db1c  Fixed Reader.HasNext() in Go client (#3764)
593db1c is described below

commit 593db1c5e609697c7ea4063387d397dfe6375675
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu Mar 7 17:09:37 2019 -0800

    Fixed Reader.HasNext() in Go client (#3764)
    
    * Fixed Reader.HasNext() in Go client
    
    * Fixed formatting
    
    * Removed commented code
---
 pulsar-client-cpp/lib/ConsumerImpl.cc  |  8 +++----
 pulsar-client-cpp/lib/ConsumerImpl.h   |  8 +++----
 pulsar-client-cpp/lib/Reader.cc        |  6 +----
 pulsar-client-go/pulsar/reader_test.go | 44 ++++++++++++++++++++++++++++++++++
 4 files changed, 52 insertions(+), 14 deletions(-)

diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc b/pulsar-client-cpp/lib/ConsumerImpl.cc
index 1c46323..7571d66 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -503,6 +503,7 @@ void ConsumerImpl::internalListener() {
     unAckedMessageTrackerPtr_->add(msg.getMessageId());
     try {
         consumerStatsBasePtr_->receivedMessage(msg, ResultOk);
+        lastDequedMessage_ = Optional<MessageId>::of(msg.getMessageId());
         messageListener_(Consumer(shared_from_this()), msg);
     } catch (const std::exception& e) {
         LOG_ERROR(getName() << "Exception thrown from listener" << e.what());
@@ -1039,8 +1040,7 @@ void ConsumerImpl::hasMessageAvailableAsync(HasMessageAvailableCallback callback
         return;
     }
 
-    BrokerGetLastMessageIdCallback callback1 = [this, lastDequed, callback](Result result,
-                                                                            MessageId messageId) {
+    getLastMessageIdAsync([this, lastDequed, callback](Result result, MessageId messageId) {
         if (result == ResultOk) {
             if (messageId > lastDequed && messageId.entryId() != -1) {
                 callback(ResultOk, true);
@@ -1050,9 +1050,7 @@ void ConsumerImpl::hasMessageAvailableAsync(HasMessageAvailableCallback callback
         } else {
             callback(result, false);
         }
-    };
-
-    getLastMessageIdAsync(callback1);
+    });
 }
 
 void ConsumerImpl::brokerGetLastMessageIdListener(Result res, MessageId messageId,
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.h b/pulsar-client-cpp/lib/ConsumerImpl.h
index 747c276..b5fe761 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.h
+++ b/pulsar-client-cpp/lib/ConsumerImpl.h
@@ -177,12 +177,12 @@ class ConsumerImpl : public ConsumerImplBase,
     void brokerGetLastMessageIdListener(Result res, MessageId messageId,
                                         BrokerGetLastMessageIdCallback callback);
 
-    MessageId lastMessageIdDequed() {
-        return lastDequedMessage_.is_present() ? lastDequedMessage_.value() : MessageId();
+    const MessageId& lastMessageIdDequed() {
+        return lastDequedMessage_.is_present() ? lastDequedMessage_.value() : MessageId::earliest();
     }
 
-    MessageId lastMessageIdInBroker() {
-        return lastMessageInBroker_.is_present() ? lastMessageInBroker_.value() : MessageId();
+    const MessageId& lastMessageIdInBroker() {
+        return lastMessageInBroker_.is_present() ? lastMessageInBroker_.value() : MessageId::earliest();
     }
 
     friend class PulsarFriend;
diff --git a/pulsar-client-cpp/lib/Reader.cc b/pulsar-client-cpp/lib/Reader.cc
index cd86d62..e14c547 100644
--- a/pulsar-client-cpp/lib/Reader.cc
+++ b/pulsar-client-cpp/lib/Reader.cc
@@ -77,13 +77,9 @@ void Reader::hasMessageAvailableAsync(HasMessageAvailableCallback callback) {
 }
 
 Result Reader::hasMessageAvailable(bool& hasMessageAvailable) {
-    if (!impl_) {
-        return ResultConsumerNotInitialized;
-    }
-
     Promise<Result, bool> promise;
 
-    impl_->hasMessageAvailableAsync(WaitForCallbackValue<bool>(promise));
+    hasMessageAvailableAsync(WaitForCallbackValue<bool>(promise));
     return promise.getFuture().get(hasMessageAvailable);
 }
 
diff --git a/pulsar-client-go/pulsar/reader_test.go b/pulsar-client-go/pulsar/reader_test.go
index 4212617..90821a8 100644
--- a/pulsar-client-go/pulsar/reader_test.go
+++ b/pulsar-client-go/pulsar/reader_test.go
@@ -23,6 +23,7 @@ import (
 	"context"
 	"fmt"
 	"github.com/stretchr/testify/assert"
+	"math/rand"
 	"strings"
 	"testing"
 	"time"
@@ -218,3 +219,46 @@ func TestReaderCompaction(t *testing.T) {
 	assert.Nil(t, msg)
 	assert.NotNil(t, err)
 }
+
+func TestReaderHasNext(t *testing.T) {
+	topic := fmt.Sprintf("TestReaderHasNext-%d", rand.Int())
+	ctx := context.Background()
+
+	client, err := NewClient(ClientOptions{
+		URL: "pulsar://localhost:6650",
+	})
+	assert.Nil(t, err)
+	defer client.Close()
+
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic: topic,
+	})
+	assert.Nil(t, err)
+	defer producer.Close()
+
+	// Send a message.
+	err = producer.Send(ctx, ProducerMessage{})
+	assert.Nil(t, err)
+
+	reader, err := client.CreateReader(ReaderOptions{
+		Topic:          topic,
+		StartMessageID: EarliestMessage,
+	})
+	assert.Nil(t, err)
+	defer reader.Close()
+
+	var hasNext bool
+
+	// Now we have 1 message to read
+	hasNext, err = reader.HasNext()
+	assert.Nil(t, err)
+	assert.True(t, hasNext)
+
+	_, err = reader.Next(ctx)
+	assert.Nil(t, err)
+
+	// Now there is no message left
+	hasNext, err = reader.HasNext()
+	assert.Nil(t, err)
+	assert.False(t, hasNext)
+}