You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/11/02 16:44:05 UTC

[GitHub] [pulsar] merlimat commented on a change in pull request #12586: [C++] Fix request timeout for GetLastMessageId doesn't work

merlimat commented on a change in pull request #12586:
URL: https://github.com/apache/pulsar/pull/12586#discussion_r741255837



##########
File path: pulsar-client-cpp/lib/Future.h
##########
@@ -31,121 +32,138 @@ typedef std::unique_lock<std::mutex> Lock;
 namespace pulsar {
 
 template <typename Result, typename Type>
-struct InternalState {
-    std::mutex mutex;
-    std::condition_variable condition;
-    Result result;
-    Type value;
-    bool complete;
-
-    std::list<typename std::function<void(Result, const Type&)> > listeners;
-};
-
-template <typename Result, typename Type>
-class Future {
+class InternalState {
    public:
-    typedef std::function<void(Result, const Type&)> ListenerCallback;
+    using ListenerCallback = std::function<void(Result, const Type&)>;
 
-    Future& addListener(ListenerCallback callback) {
-        InternalState<Result, Type>* state = state_.get();
-        Lock lock(state->mutex);
+    // There's a bug about the defaulted default constructor for GCC < 4.9.1, so we cannot use
+    // `InternalState() = default` here.
+    InternalState() {}
+    InternalState(const InternalState&) = delete;
+    InternalState& operator=(const InternalState&) = delete;
 
-        if (state->complete) {
+    static Result defaultResult() {
+        static Result result;
+        return result;
+    }
+
+    static Type defaultValue() {
+        static Type value;
+        return value;
+    }
+
+    bool completed() const noexcept { return completed_; }
+
+    void addListener(const ListenerCallback& callback) {
+        Lock lock(mutex_);
+        if (completed_) {
+            const auto result = result_;
+            const auto value = value_;
             lock.unlock();
-            callback(state->result, state->value);
+            callback(result, value);
         } else {
-            state->listeners.push_back(callback);
+            listeners_.emplace_back(callback);
         }
+    }
 
-        return *this;
+    Result wait(Type& value) {
+        Lock lock(mutex_);
+        while (!completed_) {
+            condition_.wait(lock);
+        }
+        value = value_;
+        return result_;
     }
 
-    Result get(Type& result) {
-        InternalState<Result, Type>* state = state_.get();
-        Lock lock(state->mutex);
+    bool complete(const Type& value) {
+        if (completed_) {

Review comment:
       If we don't do a compare-and-set operation (or grab the mutex at the beginning), we have a race condition here that will make us enter the completion phase more than once.

##########
File path: pulsar-client-cpp/lib/Future.h
##########
@@ -31,121 +32,138 @@ typedef std::unique_lock<std::mutex> Lock;
 namespace pulsar {
 
 template <typename Result, typename Type>
-struct InternalState {
-    std::mutex mutex;
-    std::condition_variable condition;
-    Result result;
-    Type value;
-    bool complete;
-
-    std::list<typename std::function<void(Result, const Type&)> > listeners;
-};
-
-template <typename Result, typename Type>
-class Future {
+class InternalState {
    public:
-    typedef std::function<void(Result, const Type&)> ListenerCallback;
+    using ListenerCallback = std::function<void(Result, const Type&)>;
 
-    Future& addListener(ListenerCallback callback) {
-        InternalState<Result, Type>* state = state_.get();
-        Lock lock(state->mutex);
+    // There's a bug about the defaulted default constructor for GCC < 4.9.1, so we cannot use
+    // `InternalState() = default` here.
+    InternalState() {}
+    InternalState(const InternalState&) = delete;
+    InternalState& operator=(const InternalState&) = delete;
 
-        if (state->complete) {
+    static Result defaultResult() {
+        static Result result;
+        return result;
+    }
+
+    static Type defaultValue() {
+        static Type value;
+        return value;
+    }
+
+    bool completed() const noexcept { return completed_; }
+
+    void addListener(const ListenerCallback& callback) {
+        Lock lock(mutex_);
+        if (completed_) {
+            const auto result = result_;
+            const auto value = value_;
             lock.unlock();
-            callback(state->result, state->value);
+            callback(result, value);
         } else {
-            state->listeners.push_back(callback);
+            listeners_.emplace_back(callback);
         }
+    }
 
-        return *this;
+    Result wait(Type& value) {
+        Lock lock(mutex_);
+        while (!completed_) {
+            condition_.wait(lock);
+        }
+        value = value_;
+        return result_;
     }
 
-    Result get(Type& result) {
-        InternalState<Result, Type>* state = state_.get();
-        Lock lock(state->mutex);
+    bool complete(const Type& value) {
+        if (completed_) {
+            return false;
+        }
+
+        Lock lock(mutex_);
+        value_ = value;
+        completed_ = true;
+        auto listeners = std::move(listeners_);
+        lock.unlock();
 
-        if (!state->complete) {
-            // Wait for result
-            while (!state->complete) {
-                state->condition.wait(lock);
-            }
+        for (auto& callback : listeners) {

Review comment:
       I think we'd still have a problem if listeners are being modified when the future is completed.

##########
File path: pulsar-client-cpp/lib/ClientConnection.cc
##########
@@ -1620,7 +1620,12 @@ Future<Result, MessageId> ClientConnection::newGetLastMessageId(uint64_t consume
 
     pendingGetLastMessageIdRequests_.insert(std::make_pair(requestId, promise));
     lock.unlock();
-    sendRequestWithId(Commands::newGetLastMessageId(consumerId, requestId), requestId);
+    sendRequestWithId(Commands::newGetLastMessageId(consumerId, requestId), requestId)

Review comment:
       I think we should keep this change separated from the below refactoring of the Future class.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org