You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/03/08 01:09:47 UTC
[pulsar] branch master updated: Fixed Reader.HasNext() in Go client
(#3764)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 593db1c Fixed Reader.HasNext() in Go client (#3764)
593db1c is described below
commit 593db1c5e609697c7ea4063387d397dfe6375675
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu Mar 7 17:09:37 2019 -0800
Fixed Reader.HasNext() in Go client (#3764)
* Fixed Reader.HasNext() in Go client
* Fixed formatting
* Removed commented code
---
pulsar-client-cpp/lib/ConsumerImpl.cc | 8 +++----
pulsar-client-cpp/lib/ConsumerImpl.h | 8 +++----
pulsar-client-cpp/lib/Reader.cc | 6 +----
pulsar-client-go/pulsar/reader_test.go | 44 ++++++++++++++++++++++++++++++++++
4 files changed, 52 insertions(+), 14 deletions(-)
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc b/pulsar-client-cpp/lib/ConsumerImpl.cc
index 1c46323..7571d66 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -503,6 +503,7 @@ void ConsumerImpl::internalListener() {
unAckedMessageTrackerPtr_->add(msg.getMessageId());
try {
consumerStatsBasePtr_->receivedMessage(msg, ResultOk);
+ lastDequedMessage_ = Optional<MessageId>::of(msg.getMessageId());
messageListener_(Consumer(shared_from_this()), msg);
} catch (const std::exception& e) {
LOG_ERROR(getName() << "Exception thrown from listener" << e.what());
@@ -1039,8 +1040,7 @@ void ConsumerImpl::hasMessageAvailableAsync(HasMessageAvailableCallback callback
return;
}
- BrokerGetLastMessageIdCallback callback1 = [this, lastDequed, callback](Result result,
- MessageId messageId) {
+ getLastMessageIdAsync([this, lastDequed, callback](Result result, MessageId messageId) {
if (result == ResultOk) {
if (messageId > lastDequed && messageId.entryId() != -1) {
callback(ResultOk, true);
@@ -1050,9 +1050,7 @@ void ConsumerImpl::hasMessageAvailableAsync(HasMessageAvailableCallback callback
} else {
callback(result, false);
}
- };
-
- getLastMessageIdAsync(callback1);
+ });
}
void ConsumerImpl::brokerGetLastMessageIdListener(Result res, MessageId messageId,
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.h b/pulsar-client-cpp/lib/ConsumerImpl.h
index 747c276..b5fe761 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.h
+++ b/pulsar-client-cpp/lib/ConsumerImpl.h
@@ -177,12 +177,12 @@ class ConsumerImpl : public ConsumerImplBase,
void brokerGetLastMessageIdListener(Result res, MessageId messageId,
BrokerGetLastMessageIdCallback callback);
- MessageId lastMessageIdDequed() {
- return lastDequedMessage_.is_present() ? lastDequedMessage_.value() : MessageId();
+ const MessageId& lastMessageIdDequed() {
+ return lastDequedMessage_.is_present() ? lastDequedMessage_.value() : MessageId::earliest();
}
- MessageId lastMessageIdInBroker() {
- return lastMessageInBroker_.is_present() ? lastMessageInBroker_.value() : MessageId();
+ const MessageId& lastMessageIdInBroker() {
+ return lastMessageInBroker_.is_present() ? lastMessageInBroker_.value() : MessageId::earliest();
}
friend class PulsarFriend;
diff --git a/pulsar-client-cpp/lib/Reader.cc b/pulsar-client-cpp/lib/Reader.cc
index cd86d62..e14c547 100644
--- a/pulsar-client-cpp/lib/Reader.cc
+++ b/pulsar-client-cpp/lib/Reader.cc
@@ -77,13 +77,9 @@ void Reader::hasMessageAvailableAsync(HasMessageAvailableCallback callback) {
}
Result Reader::hasMessageAvailable(bool& hasMessageAvailable) {
- if (!impl_) {
- return ResultConsumerNotInitialized;
- }
-
Promise<Result, bool> promise;
- impl_->hasMessageAvailableAsync(WaitForCallbackValue<bool>(promise));
+ hasMessageAvailableAsync(WaitForCallbackValue<bool>(promise));
return promise.getFuture().get(hasMessageAvailable);
}
diff --git a/pulsar-client-go/pulsar/reader_test.go b/pulsar-client-go/pulsar/reader_test.go
index 4212617..90821a8 100644
--- a/pulsar-client-go/pulsar/reader_test.go
+++ b/pulsar-client-go/pulsar/reader_test.go
@@ -23,6 +23,7 @@ import (
"context"
"fmt"
"github.com/stretchr/testify/assert"
+ "math/rand"
"strings"
"testing"
"time"
@@ -218,3 +219,46 @@ func TestReaderCompaction(t *testing.T) {
assert.Nil(t, msg)
assert.NotNil(t, err)
}
+
+func TestReaderHasNext(t *testing.T) {
+ topic := fmt.Sprintf("TestReaderHasNext-%d", rand.Int())
+ ctx := context.Background()
+
+ client, err := NewClient(ClientOptions{
+ URL: "pulsar://localhost:6650",
+ })
+ assert.Nil(t, err)
+ defer client.Close()
+
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topic,
+ })
+ assert.Nil(t, err)
+ defer producer.Close()
+
+ // Send a message.
+ err = producer.Send(ctx, ProducerMessage{})
+ assert.Nil(t, err)
+
+ reader, err := client.CreateReader(ReaderOptions{
+ Topic: topic,
+ StartMessageID: EarliestMessage,
+ })
+ assert.Nil(t, err)
+ defer reader.Close()
+
+ var hasNext bool
+
+ // Now we have 1 message to read
+ hasNext, err = reader.HasNext()
+ assert.Nil(t, err)
+ assert.True(t, hasNext)
+
+ _, err = reader.Next(ctx)
+ assert.Nil(t, err)
+
+ // Now there is no message left
+ hasNext, err = reader.HasNext()
+ assert.Nil(t, err)
+ assert.False(t, hasNext)
+}