You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ch...@apache.org on 2021/11/04 11:18:58 UTC

[pulsar] 10/14: [C++] Fix request timeout for GetLastMessageId doesn't work (#12586)

This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 4591b884d638951c9dda8ae87bd2c630b191e3ea
Author: Yunze Xu <xy...@163.com>
AuthorDate: Wed Nov 3 23:10:15 2021 +0800

    [C++] Fix request timeout for GetLastMessageId doesn't work (#12586)
    
    * Fix request timeout for GetLastMessageId doesn't work
    
    * Fix CentOS 7 build error
    
    * Revert refactors
    
    * Remove redundant clear for listeners
    
    * Use swap instead of move
    
    (cherry picked from commit a54c6c003c626cb16d90200ad81dd3ec37be2133)
---
 pulsar-client-cpp/lib/ClientConnection.cc |  7 ++-
 pulsar-client-cpp/lib/Future.h            | 32 +++++++-----
 pulsar-client-cpp/tests/PromiseTest.cc    | 84 +++++++++++++++++++++++++++++++
 3 files changed, 109 insertions(+), 14 deletions(-)

diff --git a/pulsar-client-cpp/lib/ClientConnection.cc b/pulsar-client-cpp/lib/ClientConnection.cc
index 6e5245e..21fdd64 100644
--- a/pulsar-client-cpp/lib/ClientConnection.cc
+++ b/pulsar-client-cpp/lib/ClientConnection.cc
@@ -1597,7 +1597,12 @@ Future<Result, MessageId> ClientConnection::newGetLastMessageId(uint64_t consume
 
     pendingGetLastMessageIdRequests_.insert(std::make_pair(requestId, promise));
     lock.unlock();
-    sendRequestWithId(Commands::newGetLastMessageId(consumerId, requestId), requestId);
+    sendRequestWithId(Commands::newGetLastMessageId(consumerId, requestId), requestId)
+        .addListener([promise](Result result, const ResponseData& data) {
+            if (result != ResultOk) {
+                promise.setFailed(result);
+            }
+        });
     return promise.getFuture();
 }
 
diff --git a/pulsar-client-cpp/lib/Future.h b/pulsar-client-cpp/lib/Future.h
index cafb63f..b695e5e 100644
--- a/pulsar-client-cpp/lib/Future.h
+++ b/pulsar-client-cpp/lib/Future.h
@@ -90,7 +90,8 @@ class Promise {
    public:
     Promise() : state_(std::make_shared<InternalState<Result, Type> >()) {}
 
-    bool setValue(const Type& value) {
+    bool setValue(const Type& value) const {
+        static Result DEFAULT_RESULT;
         InternalState<Result, Type>* state = state_.get();
         Lock lock(state->mutex);
 
@@ -99,21 +100,24 @@ class Promise {
         }
 
         state->value = value;
-        state->result = Result();
+        state->result = DEFAULT_RESULT;
         state->complete = true;
 
-        typename std::list<ListenerCallback>::iterator it;
-        for (it = state->listeners.begin(); it != state->listeners.end(); ++it) {
-            ListenerCallback& callback = *it;
-            callback(state->result, state->value);
+        decltype(state->listeners) listeners;
+        listeners.swap(state->listeners);
+
+        lock.unlock();
+
+        for (auto& callback : listeners) {
+            callback(DEFAULT_RESULT, value);
         }
 
-        state->listeners.clear();
         state->condition.notify_all();
         return true;
     }
 
-    bool setFailed(Result result) {
+    bool setFailed(Result result) const {
+        static Type DEFAULT_VALUE;
         InternalState<Result, Type>* state = state_.get();
         Lock lock(state->mutex);
 
@@ -124,13 +128,15 @@ class Promise {
         state->result = result;
         state->complete = true;
 
-        typename std::list<ListenerCallback>::iterator it;
-        for (it = state->listeners.begin(); it != state->listeners.end(); ++it) {
-            ListenerCallback& callback = *it;
-            callback(state->result, state->value);
+        decltype(state->listeners) listeners;
+        listeners.swap(state->listeners);
+
+        lock.unlock();
+
+        for (auto& callback : listeners) {
+            callback(result, DEFAULT_VALUE);
         }
 
-        state->listeners.clear();
         state->condition.notify_all();
         return true;
     }
diff --git a/pulsar-client-cpp/tests/PromiseTest.cc b/pulsar-client-cpp/tests/PromiseTest.cc
new file mode 100644
index 0000000..73c6f8c
--- /dev/null
+++ b/pulsar-client-cpp/tests/PromiseTest.cc
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <gtest/gtest.h>
+#include <lib/Future.h>
+#include <chrono>
+#include <string>
+#include <thread>
+#include <vector>
+
+using namespace pulsar;
+
+TEST(PromiseTest, testSetValue) {
+    Promise<int, std::string> promise;
+    std::thread t{[promise] {
+        std::this_thread::sleep_for(std::chrono::milliseconds(10));
+        promise.setValue("hello");
+    }};
+    t.detach();
+
+    std::string value;
+    ASSERT_EQ(promise.getFuture().get(value), 0);
+    ASSERT_EQ(value, "hello");
+}
+
+TEST(PromiseTest, testSetFailed) {
+    Promise<int, std::string> promise;
+    std::thread t{[promise] {
+        std::this_thread::sleep_for(std::chrono::milliseconds(10));
+        promise.setFailed(-1);
+    }};
+    t.detach();
+
+    std::string value;
+    ASSERT_EQ(promise.getFuture().get(value), -1);
+    ASSERT_EQ(value, "");
+}
+
+TEST(PromiseTest, testListeners) {
+    Promise<int, std::string> promise;
+    auto future = promise.getFuture();
+
+    bool resultSetFailed = true;
+    bool resultSetValue = true;
+    std::vector<int> results;
+    std::vector<std::string> values;
+
+    future
+        .addListener([promise, &resultSetFailed, &results, &values](int result, const std::string& value) {
+            resultSetFailed = promise.setFailed(-1L);
+            results.emplace_back(result);
+            values.emplace_back(value);
+        })
+        .addListener([promise, &resultSetValue, &results, &values](int result, const std::string& value) {
+            resultSetValue = promise.setValue("WRONG");
+            results.emplace_back(result);
+            values.emplace_back(value);
+        });
+
+    promise.setValue("hello");
+    std::string value;
+    ASSERT_EQ(future.get(value), 0);
+    ASSERT_EQ(value, "hello");
+
+    ASSERT_FALSE(resultSetFailed);
+    ASSERT_FALSE(resultSetValue);
+    ASSERT_EQ(results, (std::vector<int>(2, 0)));
+    ASSERT_EQ(values, (std::vector<std::string>(2, "hello")));
+}