You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2021/11/03 15:11:22 UTC
[pulsar] branch master updated: [C++] Fix request timeout for
GetLastMessageId doesn't work (#12586)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a54c6c0 [C++] Fix request timeout for GetLastMessageId doesn't work (#12586)
a54c6c0 is described below
commit a54c6c003c626cb16d90200ad81dd3ec37be2133
Author: Yunze Xu <xy...@163.com>
AuthorDate: Wed Nov 3 23:10:15 2021 +0800
[C++] Fix request timeout for GetLastMessageId doesn't work (#12586)
* Fix request timeout for GetLastMessageId doesn't work
* Fix CentOS 7 build error
* Revert refactors
* Remove redundant clear for listeners
* Use swap instead of move
---
pulsar-client-cpp/lib/ClientConnection.cc | 7 ++-
pulsar-client-cpp/lib/Future.h | 32 +++++++-----
pulsar-client-cpp/tests/PromiseTest.cc | 84 +++++++++++++++++++++++++++++++
3 files changed, 109 insertions(+), 14 deletions(-)
diff --git a/pulsar-client-cpp/lib/ClientConnection.cc b/pulsar-client-cpp/lib/ClientConnection.cc
index 2128689..3ad6f40 100644
--- a/pulsar-client-cpp/lib/ClientConnection.cc
+++ b/pulsar-client-cpp/lib/ClientConnection.cc
@@ -1620,7 +1620,12 @@ Future<Result, MessageId> ClientConnection::newGetLastMessageId(uint64_t consume
pendingGetLastMessageIdRequests_.insert(std::make_pair(requestId, promise));
lock.unlock();
- sendRequestWithId(Commands::newGetLastMessageId(consumerId, requestId), requestId);
+ sendRequestWithId(Commands::newGetLastMessageId(consumerId, requestId), requestId)
+ .addListener([promise](Result result, const ResponseData& data) {
+ if (result != ResultOk) {
+ promise.setFailed(result);
+ }
+ });
return promise.getFuture();
}
diff --git a/pulsar-client-cpp/lib/Future.h b/pulsar-client-cpp/lib/Future.h
index cafb63f..b695e5e 100644
--- a/pulsar-client-cpp/lib/Future.h
+++ b/pulsar-client-cpp/lib/Future.h
@@ -90,7 +90,8 @@ class Promise {
public:
Promise() : state_(std::make_shared<InternalState<Result, Type> >()) {}
- bool setValue(const Type& value) {
+ bool setValue(const Type& value) const {
+ static Result DEFAULT_RESULT;
InternalState<Result, Type>* state = state_.get();
Lock lock(state->mutex);
@@ -99,21 +100,24 @@ class Promise {
}
state->value = value;
- state->result = Result();
+ state->result = DEFAULT_RESULT;
state->complete = true;
- typename std::list<ListenerCallback>::iterator it;
- for (it = state->listeners.begin(); it != state->listeners.end(); ++it) {
- ListenerCallback& callback = *it;
- callback(state->result, state->value);
+ decltype(state->listeners) listeners;
+ listeners.swap(state->listeners);
+
+ lock.unlock();
+
+ for (auto& callback : listeners) {
+ callback(DEFAULT_RESULT, value);
}
- state->listeners.clear();
state->condition.notify_all();
return true;
}
- bool setFailed(Result result) {
+ bool setFailed(Result result) const {
+ static Type DEFAULT_VALUE;
InternalState<Result, Type>* state = state_.get();
Lock lock(state->mutex);
@@ -124,13 +128,15 @@ class Promise {
state->result = result;
state->complete = true;
- typename std::list<ListenerCallback>::iterator it;
- for (it = state->listeners.begin(); it != state->listeners.end(); ++it) {
- ListenerCallback& callback = *it;
- callback(state->result, state->value);
+ decltype(state->listeners) listeners;
+ listeners.swap(state->listeners);
+
+ lock.unlock();
+
+ for (auto& callback : listeners) {
+ callback(result, DEFAULT_VALUE);
}
- state->listeners.clear();
state->condition.notify_all();
return true;
}
diff --git a/pulsar-client-cpp/tests/PromiseTest.cc b/pulsar-client-cpp/tests/PromiseTest.cc
new file mode 100644
index 0000000..73c6f8c
--- /dev/null
+++ b/pulsar-client-cpp/tests/PromiseTest.cc
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <gtest/gtest.h>
+#include <lib/Future.h>
+#include <chrono>
+#include <string>
+#include <thread>
+#include <vector>
+
+using namespace pulsar;
+
+TEST(PromiseTest, testSetValue) {
+ Promise<int, std::string> promise;
+ std::thread t{[promise] {
+ std::this_thread::sleep_for(std::chrono::milliseconds(10));
+ promise.setValue("hello");
+ }};
+ t.detach();
+
+ std::string value;
+ ASSERT_EQ(promise.getFuture().get(value), 0);
+ ASSERT_EQ(value, "hello");
+}
+
+TEST(PromiseTest, testSetFailed) {
+ Promise<int, std::string> promise;
+ std::thread t{[promise] {
+ std::this_thread::sleep_for(std::chrono::milliseconds(10));
+ promise.setFailed(-1);
+ }};
+ t.detach();
+
+ std::string value;
+ ASSERT_EQ(promise.getFuture().get(value), -1);
+ ASSERT_EQ(value, "");
+}
+
+TEST(PromiseTest, testListeners) {
+ Promise<int, std::string> promise;
+ auto future = promise.getFuture();
+
+ bool resultSetFailed = true;
+ bool resultSetValue = true;
+ std::vector<int> results;
+ std::vector<std::string> values;
+
+ future
+ .addListener([promise, &resultSetFailed, &results, &values](int result, const std::string& value) {
+ resultSetFailed = promise.setFailed(-1L);
+ results.emplace_back(result);
+ values.emplace_back(value);
+ })
+ .addListener([promise, &resultSetValue, &results, &values](int result, const std::string& value) {
+ resultSetValue = promise.setValue("WRONG");
+ results.emplace_back(result);
+ values.emplace_back(value);
+ });
+
+ promise.setValue("hello");
+ std::string value;
+ ASSERT_EQ(future.get(value), 0);
+ ASSERT_EQ(value, "hello");
+
+ ASSERT_FALSE(resultSetFailed);
+ ASSERT_FALSE(resultSetValue);
+ ASSERT_EQ(results, (std::vector<int>(2, 0)));
+ ASSERT_EQ(values, (std::vector<std::string>(2, "hello")));
+}