You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/11/02 12:42:48 UTC

[GitHub] [pulsar] BewareMyPower opened a new pull request #12586: [C++] Fix request timeout for GetLastMessageId doesn't work

BewareMyPower opened a new pull request #12586:
URL: https://github.com/apache/pulsar/pull/12586


   ### Motivation
   
   https://github.com/apache/pulsar/pull/11600 adds the timeout for GetLastMessageId request by using `sendRequestWithId` instead of `sendCommand`. However, it's still incorrect. Because when the request timeout exceeds, the future that is completed with `ResultTimeout` is what `sendRequestWithId` returns but not the `promise.getFuture()`. Therefore, even if the request was not responded in `operationTimeout` seconds, the future returned by `newGetLastMessageId` would still be not completed.
   
   Besides, when I tried to complete the `promise` in `sendRequestWithId`'s callback, I found a deadlock issue if `ServerCnx#handleGetLastMessageId` hang forever (I just add a long `sleep` call in this method).
   
   ```
       frame #3: std::__1::mutex::lock() + 9
       frame #4: pulsar::Promise<pulsar::Result, pulsar::ResponseData>::setFailed(pulsar::Result) const [inlined] std::__1::unique_lock<std::__1::mutex>::unique_lock(this=<unavailable>, __m=0x00007ffb1b0044d8) at __mutex_base:119:61 [opt]
       frame #5: pulsar::Promise<pulsar::Result, pulsar::ResponseData>::setFailed(pulsar::Result) const [inlined] std::__1::unique_lock<std::__1::mutex>::unique_lock(this=<unavailable>, __m=0x00007ffb1b0044d8) at __mutex_base:119 [opt]
       frame #6: pulsar::Promise<pulsar::Result, pulsar::ResponseData>::setFailed(this=0x00007ffb1b0046e8, result=ResultConnectError) const at Future.h:118 [opt]
       frame #7: pulsar::ClientConnection::close(this=<unavailable>, result=ResultConnectError) at ClientConnection.cc:1556:27 [opt]
   ```
   
   We can see `Promise::setFailed` stuck in `ClientConnection::close`:
   
   ```c++
        for (auto& kv : pendingRequests) {
            kv.second.promise.setFailed(ResultConnectError);
        }  
   ```
   
   It's because the future's callback is called in `setFailed`. However, the callback also calls `setFailed`, which tries to acquire the same lock that is not reentrant. So deadlock happens.
   
   ### Modifications
   
   Refactor the `Future`/`Promise` infrastructure. The current design is too old and the code style is bad. The important things of the refactoring are:
   1. Change `completed_` field (the original `complete` field) to an atomic variable. So when checking if the future is completed, no lock is required.
   2. Move the triggering of `listeners_` out of the locked code block. So that each listener (callback) will be triggered without acquiring any lock.
   3. Move `conditional_variable::notify_all()` out of the locked code block as well. The notifying thread does not need to hold the lock, see https://en.cppreference.com/w/cpp/thread/condition_variable/notify_all.
   4. Move all related implementations into `InternalState` so that `Future` and `Promise` only need to call them directly.
   
   Then add a `PromiseTest` to protect the refactoring. Based on the refactor, just add a callback to `sendRequestWithId` in `newGetLastMessageId` to make sure the request timeout works.
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI checks.
   
   It's hard to simulate the operation timeout. So I have to add the following code to `ServerCnx#handleGetLastMessageId`
   
   ```java
           try {
               Thread.sleep(1000000);
           } catch (InterruptedException e) {
               e.printStackTrace();
           }
   ```
   
   and run a reader to call `hasMessageAvailable` with 3 seconds operation timeout. After it failed, the client exited after 6 seconds, which is twice the operation timeout. The logs are:
   
   ```
   2021-11-02 20:39:39.685 ERROR [0x11b16de00] SampleConsumer:48 | Failed to check hasMessageAvailable: TimeOut
   2021-11-02 20:39:39.685 INFO  [0x11b16de00] ClientImpl:492 | Closing Pulsar client with 0 producers and 1 consumers
   2021-11-02 20:39:39.685 INFO  [0x11b16de00] ConsumerImpl:894 | [persistent://public/default/my-topic-0, reader-35479ac30a, 0] Closing consumer for topic persistent://public/default/my-topic-0
   2021-11-02 20:39:43.686 ERROR [0x700006b1e000] ConsumerImpl:952 | [persistent://public/default/my-topic-0, reader-35479ac30a, 0] Failed to close consumer: TimeOut
   2021-11-02 20:39:43.686 INFO  [0x700006b1e000] ClientConnection:1542 | [[::1]:60215 -> [::1]:6650] Connection closed
   2021-11-02 20:39:43.687 INFO  [0x11b16de00] ClientConnection:255 | [[::1]:60215 -> [::1]:6650] Destroyed connection
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on a change in pull request #12586: [C++] Fix request timeout for GetLastMessageId doesn't work

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on a change in pull request #12586:
URL: https://github.com/apache/pulsar/pull/12586#discussion_r741562946



##########
File path: pulsar-client-cpp/lib/ClientConnection.cc
##########
@@ -1620,7 +1620,12 @@ Future<Result, MessageId> ClientConnection::newGetLastMessageId(uint64_t consume
 
     pendingGetLastMessageIdRequests_.insert(std::make_pair(requestId, promise));
     lock.unlock();
-    sendRequestWithId(Commands::newGetLastMessageId(consumerId, requestId), requestId);
+    sendRequestWithId(Commands::newGetLastMessageId(consumerId, requestId), requestId)

Review comment:
       I agree. I'll remove the unnecessary refactoring of this class but still add some changes.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on a change in pull request #12586: [C++] Fix request timeout for GetLastMessageId doesn't work

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on a change in pull request #12586:
URL: https://github.com/apache/pulsar/pull/12586#discussion_r741628188



##########
File path: pulsar-client-cpp/lib/Future.h
##########
@@ -99,21 +100,24 @@ class Promise {
         }
 
         state->value = value;
-        state->result = Result();
+        state->result = DEFAULT_RESULT;
         state->complete = true;
 
-        typename std::list<ListenerCallback>::iterator it;
-        for (it = state->listeners.begin(); it != state->listeners.end(); ++it) {
-            ListenerCallback& callback = *it;
-            callback(state->result, state->value);
+        const auto listeners = std::move(state->listeners);
+
+        lock.unlock();
+
+        for (auto& callback : listeners) {
+            callback(DEFAULT_RESULT, value);
         }
 
         state->listeners.clear();

Review comment:
       Sounds reasonable. Ideally we should not use the object that has been moved. I'll adopt the swap way.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] merlimat merged pull request #12586: [C++] Fix request timeout for GetLastMessageId doesn't work

Posted by GitBox <gi...@apache.org>.
merlimat merged pull request #12586:
URL: https://github.com/apache/pulsar/pull/12586


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on pull request #12586: [C++] Fix request timeout for GetLastMessageId doesn't work

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on pull request #12586:
URL: https://github.com/apache/pulsar/pull/12586#issuecomment-957745057


   I'll fix the compilation error soon.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] merlimat commented on a change in pull request #12586: [C++] Fix request timeout for GetLastMessageId doesn't work

Posted by GitBox <gi...@apache.org>.
merlimat commented on a change in pull request #12586:
URL: https://github.com/apache/pulsar/pull/12586#discussion_r741607586



##########
File path: pulsar-client-cpp/lib/Future.h
##########
@@ -31,121 +32,138 @@ typedef std::unique_lock<std::mutex> Lock;
 namespace pulsar {
 
 template <typename Result, typename Type>
-struct InternalState {
-    std::mutex mutex;
-    std::condition_variable condition;
-    Result result;
-    Type value;
-    bool complete;
-
-    std::list<typename std::function<void(Result, const Type&)> > listeners;
-};
-
-template <typename Result, typename Type>
-class Future {
+class InternalState {
    public:
-    typedef std::function<void(Result, const Type&)> ListenerCallback;
+    using ListenerCallback = std::function<void(Result, const Type&)>;
 
-    Future& addListener(ListenerCallback callback) {
-        InternalState<Result, Type>* state = state_.get();
-        Lock lock(state->mutex);
+    // There's a bug about the defaulted default constructor for GCC < 4.9.1, so we cannot use
+    // `InternalState() = default` here.
+    InternalState() {}
+    InternalState(const InternalState&) = delete;
+    InternalState& operator=(const InternalState&) = delete;
 
-        if (state->complete) {
+    static Result defaultResult() {
+        static Result result;
+        return result;
+    }
+
+    static Type defaultValue() {
+        static Type value;
+        return value;
+    }
+
+    bool completed() const noexcept { return completed_; }
+
+    void addListener(const ListenerCallback& callback) {
+        Lock lock(mutex_);
+        if (completed_) {
+            const auto result = result_;
+            const auto value = value_;
             lock.unlock();
-            callback(state->result, state->value);
+            callback(result, value);
         } else {
-            state->listeners.push_back(callback);
+            listeners_.emplace_back(callback);
         }
+    }
 
-        return *this;
+    Result wait(Type& value) {
+        Lock lock(mutex_);
+        while (!completed_) {
+            condition_.wait(lock);
+        }
+        value = value_;
+        return result_;
     }
 
-    Result get(Type& result) {
-        InternalState<Result, Type>* state = state_.get();
-        Lock lock(state->mutex);
+    bool complete(const Type& value) {
+        if (completed_) {
+            return false;
+        }
+
+        Lock lock(mutex_);
+        value_ = value;
+        completed_ = true;
+        auto listeners = std::move(listeners_);
+        lock.unlock();
 
-        if (!state->complete) {
-            // Wait for result
-            while (!state->complete) {
-                state->condition.wait(lock);
-            }
+        for (auto& callback : listeners) {

Review comment:
       Ok, we just need to be careful in making sure that when a listener is added and the future is already completed, the listener needs to be triggered immediately.

##########
File path: pulsar-client-cpp/lib/Future.h
##########
@@ -99,21 +100,24 @@ class Promise {
         }
 
         state->value = value;
-        state->result = Result();
+        state->result = DEFAULT_RESULT;
         state->complete = true;
 
-        typename std::list<ListenerCallback>::iterator it;
-        for (it = state->listeners.begin(); it != state->listeners.end(); ++it) {
-            ListenerCallback& callback = *it;
-            callback(state->result, state->value);
+        const auto listeners = std::move(state->listeners);
+
+        lock.unlock();
+
+        for (auto& callback : listeners) {
+            callback(DEFAULT_RESULT, value);
         }
 
         state->listeners.clear();

Review comment:
       the `state->listeners.clear()` would have to be done from within the mutex. Probably it might not be needed anymore? In what state is the `std::vector` left after the listeners are moved out?

##########
File path: pulsar-client-cpp/lib/Future.h
##########
@@ -99,21 +100,24 @@ class Promise {
         }
 
         state->value = value;
-        state->result = Result();
+        state->result = DEFAULT_RESULT;
         state->complete = true;
 
-        typename std::list<ListenerCallback>::iterator it;
-        for (it = state->listeners.begin(); it != state->listeners.end(); ++it) {
-            ListenerCallback& callback = *it;
-            callback(state->result, state->value);
+        const auto listeners = std::move(state->listeners);
+
+        lock.unlock();
+
+        for (auto& callback : listeners) {
+            callback(DEFAULT_RESULT, value);
         }
 
         state->listeners.clear();

Review comment:
       The only thing I’m not 100 sure if it’s safe to keep using the listener vector after it's moved out. Eg. If a new listener is added after the future  is completed, it will add again into the moved vector which is in an undefined state. A I understand, the only guarantee is that the vector will be empty, but we have no idea whether we can add or not. 
   
   Instead of the move, it might actually be safer to do a swap with an empty std::vector.

##########
File path: pulsar-client-cpp/lib/Future.h
##########
@@ -31,121 +32,138 @@ typedef std::unique_lock<std::mutex> Lock;
 namespace pulsar {
 
 template <typename Result, typename Type>
-struct InternalState {
-    std::mutex mutex;
-    std::condition_variable condition;
-    Result result;
-    Type value;
-    bool complete;
-
-    std::list<typename std::function<void(Result, const Type&)> > listeners;
-};
-
-template <typename Result, typename Type>
-class Future {
+class InternalState {
    public:
-    typedef std::function<void(Result, const Type&)> ListenerCallback;
+    using ListenerCallback = std::function<void(Result, const Type&)>;
 
-    Future& addListener(ListenerCallback callback) {
-        InternalState<Result, Type>* state = state_.get();
-        Lock lock(state->mutex);
+    // There's a bug about the defaulted default constructor for GCC < 4.9.1, so we cannot use
+    // `InternalState() = default` here.
+    InternalState() {}
+    InternalState(const InternalState&) = delete;
+    InternalState& operator=(const InternalState&) = delete;
 
-        if (state->complete) {
+    static Result defaultResult() {
+        static Result result;
+        return result;
+    }
+
+    static Type defaultValue() {
+        static Type value;
+        return value;
+    }
+
+    bool completed() const noexcept { return completed_; }
+
+    void addListener(const ListenerCallback& callback) {
+        Lock lock(mutex_);
+        if (completed_) {
+            const auto result = result_;
+            const auto value = value_;
             lock.unlock();
-            callback(state->result, state->value);
+            callback(result, value);
         } else {
-            state->listeners.push_back(callback);
+            listeners_.emplace_back(callback);
         }
+    }
 
-        return *this;
+    Result wait(Type& value) {
+        Lock lock(mutex_);
+        while (!completed_) {
+            condition_.wait(lock);
+        }
+        value = value_;
+        return result_;
     }
 
-    Result get(Type& result) {
-        InternalState<Result, Type>* state = state_.get();
-        Lock lock(state->mutex);
+    bool complete(const Type& value) {
+        if (completed_) {
+            return false;
+        }
+
+        Lock lock(mutex_);
+        value_ = value;
+        completed_ = true;
+        auto listeners = std::move(listeners_);
+        lock.unlock();
 
-        if (!state->complete) {
-            // Wait for result
-            while (!state->complete) {
-                state->condition.wait(lock);
-            }
+        for (auto& callback : listeners) {

Review comment:
       Ok, we just need to be careful in making sure that when a listener is added and the future is already completed, the listener needs to be triggered immediately.

##########
File path: pulsar-client-cpp/lib/Future.h
##########
@@ -99,21 +100,24 @@ class Promise {
         }
 
         state->value = value;
-        state->result = Result();
+        state->result = DEFAULT_RESULT;
         state->complete = true;
 
-        typename std::list<ListenerCallback>::iterator it;
-        for (it = state->listeners.begin(); it != state->listeners.end(); ++it) {
-            ListenerCallback& callback = *it;
-            callback(state->result, state->value);
+        const auto listeners = std::move(state->listeners);
+
+        lock.unlock();
+
+        for (auto& callback : listeners) {
+            callback(DEFAULT_RESULT, value);
         }
 
         state->listeners.clear();

Review comment:
       the `state->listeners.clear()` would have to be done from within the mutex. Probably it might not be needed anymore? In what state is the `std::vector` left after the listeners are moved out?

##########
File path: pulsar-client-cpp/lib/Future.h
##########
@@ -99,21 +100,24 @@ class Promise {
         }
 
         state->value = value;
-        state->result = Result();
+        state->result = DEFAULT_RESULT;
         state->complete = true;
 
-        typename std::list<ListenerCallback>::iterator it;
-        for (it = state->listeners.begin(); it != state->listeners.end(); ++it) {
-            ListenerCallback& callback = *it;
-            callback(state->result, state->value);
+        const auto listeners = std::move(state->listeners);
+
+        lock.unlock();
+
+        for (auto& callback : listeners) {
+            callback(DEFAULT_RESULT, value);
         }
 
         state->listeners.clear();

Review comment:
       The only thing I’m not 100 sure if it’s safe to keep using the listener vector after it's moved out. Eg. If a new listener is added after the future  is completed, it will add again into the moved vector which is in an undefined state. A I understand, the only guarantee is that the vector will be empty, but we have no idea whether we can add or not. 
   
   Instead of the move, it might actually be safer to do a swap with an empty std::vector.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] merlimat commented on a change in pull request #12586: [C++] Fix request timeout for GetLastMessageId doesn't work

Posted by GitBox <gi...@apache.org>.
merlimat commented on a change in pull request #12586:
URL: https://github.com/apache/pulsar/pull/12586#discussion_r741255837



##########
File path: pulsar-client-cpp/lib/Future.h
##########
@@ -31,121 +32,138 @@ typedef std::unique_lock<std::mutex> Lock;
 namespace pulsar {
 
 template <typename Result, typename Type>
-struct InternalState {
-    std::mutex mutex;
-    std::condition_variable condition;
-    Result result;
-    Type value;
-    bool complete;
-
-    std::list<typename std::function<void(Result, const Type&)> > listeners;
-};
-
-template <typename Result, typename Type>
-class Future {
+class InternalState {
    public:
-    typedef std::function<void(Result, const Type&)> ListenerCallback;
+    using ListenerCallback = std::function<void(Result, const Type&)>;
 
-    Future& addListener(ListenerCallback callback) {
-        InternalState<Result, Type>* state = state_.get();
-        Lock lock(state->mutex);
+    // There's a bug about the defaulted default constructor for GCC < 4.9.1, so we cannot use
+    // `InternalState() = default` here.
+    InternalState() {}
+    InternalState(const InternalState&) = delete;
+    InternalState& operator=(const InternalState&) = delete;
 
-        if (state->complete) {
+    static Result defaultResult() {
+        static Result result;
+        return result;
+    }
+
+    static Type defaultValue() {
+        static Type value;
+        return value;
+    }
+
+    bool completed() const noexcept { return completed_; }
+
+    void addListener(const ListenerCallback& callback) {
+        Lock lock(mutex_);
+        if (completed_) {
+            const auto result = result_;
+            const auto value = value_;
             lock.unlock();
-            callback(state->result, state->value);
+            callback(result, value);
         } else {
-            state->listeners.push_back(callback);
+            listeners_.emplace_back(callback);
         }
+    }
 
-        return *this;
+    Result wait(Type& value) {
+        Lock lock(mutex_);
+        while (!completed_) {
+            condition_.wait(lock);
+        }
+        value = value_;
+        return result_;
     }
 
-    Result get(Type& result) {
-        InternalState<Result, Type>* state = state_.get();
-        Lock lock(state->mutex);
+    bool complete(const Type& value) {
+        if (completed_) {

Review comment:
       If we don't do a compare-and-set operation (or grab the mutex at the beginning), we have a race condition here that will make us enter the completion phase more than once.

##########
File path: pulsar-client-cpp/lib/Future.h
##########
@@ -31,121 +32,138 @@ typedef std::unique_lock<std::mutex> Lock;
 namespace pulsar {
 
 template <typename Result, typename Type>
-struct InternalState {
-    std::mutex mutex;
-    std::condition_variable condition;
-    Result result;
-    Type value;
-    bool complete;
-
-    std::list<typename std::function<void(Result, const Type&)> > listeners;
-};
-
-template <typename Result, typename Type>
-class Future {
+class InternalState {
    public:
-    typedef std::function<void(Result, const Type&)> ListenerCallback;
+    using ListenerCallback = std::function<void(Result, const Type&)>;
 
-    Future& addListener(ListenerCallback callback) {
-        InternalState<Result, Type>* state = state_.get();
-        Lock lock(state->mutex);
+    // There's a bug about the defaulted default constructor for GCC < 4.9.1, so we cannot use
+    // `InternalState() = default` here.
+    InternalState() {}
+    InternalState(const InternalState&) = delete;
+    InternalState& operator=(const InternalState&) = delete;
 
-        if (state->complete) {
+    static Result defaultResult() {
+        static Result result;
+        return result;
+    }
+
+    static Type defaultValue() {
+        static Type value;
+        return value;
+    }
+
+    bool completed() const noexcept { return completed_; }
+
+    void addListener(const ListenerCallback& callback) {
+        Lock lock(mutex_);
+        if (completed_) {
+            const auto result = result_;
+            const auto value = value_;
             lock.unlock();
-            callback(state->result, state->value);
+            callback(result, value);
         } else {
-            state->listeners.push_back(callback);
+            listeners_.emplace_back(callback);
         }
+    }
 
-        return *this;
+    Result wait(Type& value) {
+        Lock lock(mutex_);
+        while (!completed_) {
+            condition_.wait(lock);
+        }
+        value = value_;
+        return result_;
     }
 
-    Result get(Type& result) {
-        InternalState<Result, Type>* state = state_.get();
-        Lock lock(state->mutex);
+    bool complete(const Type& value) {
+        if (completed_) {
+            return false;
+        }
+
+        Lock lock(mutex_);
+        value_ = value;
+        completed_ = true;
+        auto listeners = std::move(listeners_);
+        lock.unlock();
 
-        if (!state->complete) {
-            // Wait for result
-            while (!state->complete) {
-                state->condition.wait(lock);
-            }
+        for (auto& callback : listeners) {

Review comment:
       I think we'd still have a problem if listeners are being modified when the future is completed.

##########
File path: pulsar-client-cpp/lib/ClientConnection.cc
##########
@@ -1620,7 +1620,12 @@ Future<Result, MessageId> ClientConnection::newGetLastMessageId(uint64_t consume
 
     pendingGetLastMessageIdRequests_.insert(std::make_pair(requestId, promise));
     lock.unlock();
-    sendRequestWithId(Commands::newGetLastMessageId(consumerId, requestId), requestId);
+    sendRequestWithId(Commands::newGetLastMessageId(consumerId, requestId), requestId)

Review comment:
       I think we should keep this change separated from the below refactoring of the Future class.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] merlimat commented on a change in pull request #12586: [C++] Fix request timeout for GetLastMessageId doesn't work

Posted by GitBox <gi...@apache.org>.
merlimat commented on a change in pull request #12586:
URL: https://github.com/apache/pulsar/pull/12586#discussion_r741621639



##########
File path: pulsar-client-cpp/lib/Future.h
##########
@@ -99,21 +100,24 @@ class Promise {
         }
 
         state->value = value;
-        state->result = Result();
+        state->result = DEFAULT_RESULT;
         state->complete = true;
 
-        typename std::list<ListenerCallback>::iterator it;
-        for (it = state->listeners.begin(); it != state->listeners.end(); ++it) {
-            ListenerCallback& callback = *it;
-            callback(state->result, state->value);
+        const auto listeners = std::move(state->listeners);
+
+        lock.unlock();
+
+        for (auto& callback : listeners) {
+            callback(DEFAULT_RESULT, value);
         }
 
         state->listeners.clear();

Review comment:
       the `state->listeners.clear()` would have to be done from within the mutex. Probably it might not be needed anymore? In what state is the `std::vector` left after the listeners are moved out?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on pull request #12586: [C++] Fix request timeout for GetLastMessageId doesn't work

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on pull request #12586:
URL: https://github.com/apache/pulsar/pull/12586#issuecomment-957745057


   I'll fix the compilation error soon.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] merlimat merged pull request #12586: [C++] Fix request timeout for GetLastMessageId doesn't work

Posted by GitBox <gi...@apache.org>.
merlimat merged pull request #12586:
URL: https://github.com/apache/pulsar/pull/12586






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on a change in pull request #12586: [C++] Fix request timeout for GetLastMessageId doesn't work

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on a change in pull request #12586:
URL: https://github.com/apache/pulsar/pull/12586#discussion_r741562539



##########
File path: pulsar-client-cpp/lib/Future.h
##########
@@ -31,121 +32,138 @@ typedef std::unique_lock<std::mutex> Lock;
 namespace pulsar {
 
 template <typename Result, typename Type>
-struct InternalState {
-    std::mutex mutex;
-    std::condition_variable condition;
-    Result result;
-    Type value;
-    bool complete;
-
-    std::list<typename std::function<void(Result, const Type&)> > listeners;
-};
-
-template <typename Result, typename Type>
-class Future {
+class InternalState {
    public:
-    typedef std::function<void(Result, const Type&)> ListenerCallback;
+    using ListenerCallback = std::function<void(Result, const Type&)>;
 
-    Future& addListener(ListenerCallback callback) {
-        InternalState<Result, Type>* state = state_.get();
-        Lock lock(state->mutex);
+    // There's a bug about the defaulted default constructor for GCC < 4.9.1, so we cannot use
+    // `InternalState() = default` here.
+    InternalState() {}
+    InternalState(const InternalState&) = delete;
+    InternalState& operator=(const InternalState&) = delete;
 
-        if (state->complete) {
+    static Result defaultResult() {
+        static Result result;
+        return result;
+    }
+
+    static Type defaultValue() {
+        static Type value;
+        return value;
+    }
+
+    bool completed() const noexcept { return completed_; }
+
+    void addListener(const ListenerCallback& callback) {
+        Lock lock(mutex_);
+        if (completed_) {
+            const auto result = result_;
+            const auto value = value_;
             lock.unlock();
-            callback(state->result, state->value);
+            callback(result, value);
         } else {
-            state->listeners.push_back(callback);
+            listeners_.emplace_back(callback);
         }
+    }
 
-        return *this;
+    Result wait(Type& value) {
+        Lock lock(mutex_);
+        while (!completed_) {
+            condition_.wait(lock);
+        }
+        value = value_;
+        return result_;
     }
 
-    Result get(Type& result) {
-        InternalState<Result, Type>* state = state_.get();
-        Lock lock(state->mutex);
+    bool complete(const Type& value) {
+        if (completed_) {

Review comment:
       Yes, it's right.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on a change in pull request #12586: [C++] Fix request timeout for GetLastMessageId doesn't work

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on a change in pull request #12586:
URL: https://github.com/apache/pulsar/pull/12586#discussion_r741562539



##########
File path: pulsar-client-cpp/lib/Future.h
##########
@@ -31,121 +32,138 @@ typedef std::unique_lock<std::mutex> Lock;
 namespace pulsar {
 
 template <typename Result, typename Type>
-struct InternalState {
-    std::mutex mutex;
-    std::condition_variable condition;
-    Result result;
-    Type value;
-    bool complete;
-
-    std::list<typename std::function<void(Result, const Type&)> > listeners;
-};
-
-template <typename Result, typename Type>
-class Future {
+class InternalState {
    public:
-    typedef std::function<void(Result, const Type&)> ListenerCallback;
+    using ListenerCallback = std::function<void(Result, const Type&)>;
 
-    Future& addListener(ListenerCallback callback) {
-        InternalState<Result, Type>* state = state_.get();
-        Lock lock(state->mutex);
+    // There's a bug about the defaulted default constructor for GCC < 4.9.1, so we cannot use
+    // `InternalState() = default` here.
+    InternalState() {}
+    InternalState(const InternalState&) = delete;
+    InternalState& operator=(const InternalState&) = delete;
 
-        if (state->complete) {
+    static Result defaultResult() {
+        static Result result;
+        return result;
+    }
+
+    static Type defaultValue() {
+        static Type value;
+        return value;
+    }
+
+    bool completed() const noexcept { return completed_; }
+
+    void addListener(const ListenerCallback& callback) {
+        Lock lock(mutex_);
+        if (completed_) {
+            const auto result = result_;
+            const auto value = value_;
             lock.unlock();
-            callback(state->result, state->value);
+            callback(result, value);
         } else {
-            state->listeners.push_back(callback);
+            listeners_.emplace_back(callback);
         }
+    }
 
-        return *this;
+    Result wait(Type& value) {
+        Lock lock(mutex_);
+        while (!completed_) {
+            condition_.wait(lock);
+        }
+        value = value_;
+        return result_;
     }
 
-    Result get(Type& result) {
-        InternalState<Result, Type>* state = state_.get();
-        Lock lock(state->mutex);
+    bool complete(const Type& value) {
+        if (completed_) {

Review comment:
       Yes, it's right.

##########
File path: pulsar-client-cpp/lib/ClientConnection.cc
##########
@@ -1620,7 +1620,12 @@ Future<Result, MessageId> ClientConnection::newGetLastMessageId(uint64_t consume
 
     pendingGetLastMessageIdRequests_.insert(std::make_pair(requestId, promise));
     lock.unlock();
-    sendRequestWithId(Commands::newGetLastMessageId(consumerId, requestId), requestId);
+    sendRequestWithId(Commands::newGetLastMessageId(consumerId, requestId), requestId)

Review comment:
       I agree. I'll remove the unnecessary refactoring of this class but still add some changes.

##########
File path: pulsar-client-cpp/lib/Future.h
##########
@@ -31,121 +32,138 @@ typedef std::unique_lock<std::mutex> Lock;
 namespace pulsar {
 
 template <typename Result, typename Type>
-struct InternalState {
-    std::mutex mutex;
-    std::condition_variable condition;
-    Result result;
-    Type value;
-    bool complete;
-
-    std::list<typename std::function<void(Result, const Type&)> > listeners;
-};
-
-template <typename Result, typename Type>
-class Future {
+class InternalState {
    public:
-    typedef std::function<void(Result, const Type&)> ListenerCallback;
+    using ListenerCallback = std::function<void(Result, const Type&)>;
 
-    Future& addListener(ListenerCallback callback) {
-        InternalState<Result, Type>* state = state_.get();
-        Lock lock(state->mutex);
+    // There's a bug about the defaulted default constructor for GCC < 4.9.1, so we cannot use
+    // `InternalState() = default` here.
+    InternalState() {}
+    InternalState(const InternalState&) = delete;
+    InternalState& operator=(const InternalState&) = delete;
 
-        if (state->complete) {
+    static Result defaultResult() {
+        static Result result;
+        return result;
+    }
+
+    static Type defaultValue() {
+        static Type value;
+        return value;
+    }
+
+    bool completed() const noexcept { return completed_; }
+
+    void addListener(const ListenerCallback& callback) {
+        Lock lock(mutex_);
+        if (completed_) {
+            const auto result = result_;
+            const auto value = value_;
             lock.unlock();
-            callback(state->result, state->value);
+            callback(result, value);
         } else {
-            state->listeners.push_back(callback);
+            listeners_.emplace_back(callback);
         }
+    }
 
-        return *this;
+    Result wait(Type& value) {
+        Lock lock(mutex_);
+        while (!completed_) {
+            condition_.wait(lock);
+        }
+        value = value_;
+        return result_;
     }
 
-    Result get(Type& result) {
-        InternalState<Result, Type>* state = state_.get();
-        Lock lock(state->mutex);
+    bool complete(const Type& value) {
+        if (completed_) {
+            return false;
+        }
+
+        Lock lock(mutex_);
+        value_ = value;
+        completed_ = true;
+        auto listeners = std::move(listeners_);
+        lock.unlock();
 
-        if (!state->complete) {
-            // Wait for result
-            while (!state->complete) {
-                state->condition.wait(lock);
-            }
+        for (auto& callback : listeners) {

Review comment:
        The `listeners` is a local variable and moved from the `listeners_` field. And the move operation is protected by `mutex`.

##########
File path: pulsar-client-cpp/lib/Future.h
##########
@@ -31,121 +32,138 @@ typedef std::unique_lock<std::mutex> Lock;
 namespace pulsar {
 
 template <typename Result, typename Type>
-struct InternalState {
-    std::mutex mutex;
-    std::condition_variable condition;
-    Result result;
-    Type value;
-    bool complete;
-
-    std::list<typename std::function<void(Result, const Type&)> > listeners;
-};
-
-template <typename Result, typename Type>
-class Future {
+class InternalState {
    public:
-    typedef std::function<void(Result, const Type&)> ListenerCallback;
+    using ListenerCallback = std::function<void(Result, const Type&)>;
 
-    Future& addListener(ListenerCallback callback) {
-        InternalState<Result, Type>* state = state_.get();
-        Lock lock(state->mutex);
+    // There's a bug about the defaulted default constructor for GCC < 4.9.1, so we cannot use
+    // `InternalState() = default` here.
+    InternalState() {}
+    InternalState(const InternalState&) = delete;
+    InternalState& operator=(const InternalState&) = delete;
 
-        if (state->complete) {
+    static Result defaultResult() {
+        static Result result;
+        return result;
+    }
+
+    static Type defaultValue() {
+        static Type value;
+        return value;
+    }
+
+    bool completed() const noexcept { return completed_; }
+
+    void addListener(const ListenerCallback& callback) {
+        Lock lock(mutex_);
+        if (completed_) {
+            const auto result = result_;
+            const auto value = value_;
             lock.unlock();
-            callback(state->result, state->value);
+            callback(result, value);
         } else {
-            state->listeners.push_back(callback);
+            listeners_.emplace_back(callback);
         }
+    }
 
-        return *this;
+    Result wait(Type& value) {
+        Lock lock(mutex_);
+        while (!completed_) {
+            condition_.wait(lock);
+        }
+        value = value_;
+        return result_;
     }
 
-    Result get(Type& result) {
-        InternalState<Result, Type>* state = state_.get();
-        Lock lock(state->mutex);
+    bool complete(const Type& value) {
+        if (completed_) {
+            return false;
+        }
+
+        Lock lock(mutex_);
+        value_ = value;
+        completed_ = true;
+        auto listeners = std::move(listeners_);
+        lock.unlock();
 
-        if (!state->complete) {
-            // Wait for result
-            while (!state->complete) {
-                state->condition.wait(lock);
-            }
+        for (auto& callback : listeners) {

Review comment:
       Yes. So I just revert the unnecessary changes and only move the iteration of listeners outside the locked block.

##########
File path: pulsar-client-cpp/lib/Future.h
##########
@@ -99,21 +100,24 @@ class Promise {
         }
 
         state->value = value;
-        state->result = Result();
+        state->result = DEFAULT_RESULT;
         state->complete = true;
 
-        typename std::list<ListenerCallback>::iterator it;
-        for (it = state->listeners.begin(); it != state->listeners.end(); ++it) {
-            ListenerCallback& callback = *it;
-            callback(state->result, state->value);
+        const auto listeners = std::move(state->listeners);
+
+        lock.unlock();
+
+        for (auto& callback : listeners) {
+            callback(DEFAULT_RESULT, value);
         }
 
         state->listeners.clear();

Review comment:
       It's right. I just forgot to remove it. (I have removed it from `setFailed`)

##########
File path: pulsar-client-cpp/lib/Future.h
##########
@@ -99,21 +100,24 @@ class Promise {
         }
 
         state->value = value;
-        state->result = Result();
+        state->result = DEFAULT_RESULT;
         state->complete = true;
 
-        typename std::list<ListenerCallback>::iterator it;
-        for (it = state->listeners.begin(); it != state->listeners.end(); ++it) {
-            ListenerCallback& callback = *it;
-            callback(state->result, state->value);
+        const auto listeners = std::move(state->listeners);
+
+        lock.unlock();
+
+        for (auto& callback : listeners) {
+            callback(DEFAULT_RESULT, value);
         }
 
         state->listeners.clear();

Review comment:
       Sounds reasonable. Ideally we should not use the object that has been moved. I'll adopt the swap way.

##########
File path: pulsar-client-cpp/lib/Future.h
##########
@@ -31,121 +32,138 @@ typedef std::unique_lock<std::mutex> Lock;
 namespace pulsar {
 
 template <typename Result, typename Type>
-struct InternalState {
-    std::mutex mutex;
-    std::condition_variable condition;
-    Result result;
-    Type value;
-    bool complete;
-
-    std::list<typename std::function<void(Result, const Type&)> > listeners;
-};
-
-template <typename Result, typename Type>
-class Future {
+class InternalState {
    public:
-    typedef std::function<void(Result, const Type&)> ListenerCallback;
+    using ListenerCallback = std::function<void(Result, const Type&)>;
 
-    Future& addListener(ListenerCallback callback) {
-        InternalState<Result, Type>* state = state_.get();
-        Lock lock(state->mutex);
+    // There's a bug about the defaulted default constructor for GCC < 4.9.1, so we cannot use
+    // `InternalState() = default` here.
+    InternalState() {}
+    InternalState(const InternalState&) = delete;
+    InternalState& operator=(const InternalState&) = delete;
 
-        if (state->complete) {
+    static Result defaultResult() {
+        static Result result;
+        return result;
+    }
+
+    static Type defaultValue() {
+        static Type value;
+        return value;
+    }
+
+    bool completed() const noexcept { return completed_; }
+
+    void addListener(const ListenerCallback& callback) {
+        Lock lock(mutex_);
+        if (completed_) {
+            const auto result = result_;
+            const auto value = value_;
             lock.unlock();
-            callback(state->result, state->value);
+            callback(result, value);
         } else {
-            state->listeners.push_back(callback);
+            listeners_.emplace_back(callback);
         }
+    }
 
-        return *this;
+    Result wait(Type& value) {
+        Lock lock(mutex_);
+        while (!completed_) {
+            condition_.wait(lock);
+        }
+        value = value_;
+        return result_;
     }
 
-    Result get(Type& result) {
-        InternalState<Result, Type>* state = state_.get();
-        Lock lock(state->mutex);
+    bool complete(const Type& value) {
+        if (completed_) {
+            return false;
+        }
+
+        Lock lock(mutex_);
+        value_ = value;
+        completed_ = true;
+        auto listeners = std::move(listeners_);
+        lock.unlock();
 
-        if (!state->complete) {
-            // Wait for result
-            while (!state->complete) {
-                state->condition.wait(lock);
-            }
+        for (auto& callback : listeners) {

Review comment:
       Yes. So I just revert the unnecessary changes and only move the iteration of listeners outside the locked block.

##########
File path: pulsar-client-cpp/lib/Future.h
##########
@@ -99,21 +100,24 @@ class Promise {
         }
 
         state->value = value;
-        state->result = Result();
+        state->result = DEFAULT_RESULT;
         state->complete = true;
 
-        typename std::list<ListenerCallback>::iterator it;
-        for (it = state->listeners.begin(); it != state->listeners.end(); ++it) {
-            ListenerCallback& callback = *it;
-            callback(state->result, state->value);
+        const auto listeners = std::move(state->listeners);
+
+        lock.unlock();
+
+        for (auto& callback : listeners) {
+            callback(DEFAULT_RESULT, value);
         }
 
         state->listeners.clear();

Review comment:
       It's right. I just forgot to remove it. (I have removed it from `setFailed`)

##########
File path: pulsar-client-cpp/lib/Future.h
##########
@@ -99,21 +100,24 @@ class Promise {
         }
 
         state->value = value;
-        state->result = Result();
+        state->result = DEFAULT_RESULT;
         state->complete = true;
 
-        typename std::list<ListenerCallback>::iterator it;
-        for (it = state->listeners.begin(); it != state->listeners.end(); ++it) {
-            ListenerCallback& callback = *it;
-            callback(state->result, state->value);
+        const auto listeners = std::move(state->listeners);
+
+        lock.unlock();
+
+        for (auto& callback : listeners) {
+            callback(DEFAULT_RESULT, value);
         }
 
         state->listeners.clear();

Review comment:
       Sounds reasonable. Ideally we should not use the object that has been moved. I'll adopt the swap way.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] merlimat commented on a change in pull request #12586: [C++] Fix request timeout for GetLastMessageId doesn't work

Posted by GitBox <gi...@apache.org>.
merlimat commented on a change in pull request #12586:
URL: https://github.com/apache/pulsar/pull/12586#discussion_r741255837



##########
File path: pulsar-client-cpp/lib/Future.h
##########
@@ -31,121 +32,138 @@ typedef std::unique_lock<std::mutex> Lock;
 namespace pulsar {
 
 template <typename Result, typename Type>
-struct InternalState {
-    std::mutex mutex;
-    std::condition_variable condition;
-    Result result;
-    Type value;
-    bool complete;
-
-    std::list<typename std::function<void(Result, const Type&)> > listeners;
-};
-
-template <typename Result, typename Type>
-class Future {
+class InternalState {
    public:
-    typedef std::function<void(Result, const Type&)> ListenerCallback;
+    using ListenerCallback = std::function<void(Result, const Type&)>;
 
-    Future& addListener(ListenerCallback callback) {
-        InternalState<Result, Type>* state = state_.get();
-        Lock lock(state->mutex);
+    // There's a bug about the defaulted default constructor for GCC < 4.9.1, so we cannot use
+    // `InternalState() = default` here.
+    InternalState() {}
+    InternalState(const InternalState&) = delete;
+    InternalState& operator=(const InternalState&) = delete;
 
-        if (state->complete) {
+    static Result defaultResult() {
+        static Result result;
+        return result;
+    }
+
+    static Type defaultValue() {
+        static Type value;
+        return value;
+    }
+
+    bool completed() const noexcept { return completed_; }
+
+    void addListener(const ListenerCallback& callback) {
+        Lock lock(mutex_);
+        if (completed_) {
+            const auto result = result_;
+            const auto value = value_;
             lock.unlock();
-            callback(state->result, state->value);
+            callback(result, value);
         } else {
-            state->listeners.push_back(callback);
+            listeners_.emplace_back(callback);
         }
+    }
 
-        return *this;
+    Result wait(Type& value) {
+        Lock lock(mutex_);
+        while (!completed_) {
+            condition_.wait(lock);
+        }
+        value = value_;
+        return result_;
     }
 
-    Result get(Type& result) {
-        InternalState<Result, Type>* state = state_.get();
-        Lock lock(state->mutex);
+    bool complete(const Type& value) {
+        if (completed_) {

Review comment:
       If we don't do a compare-and-set operation (or grab the mutex at the beginning), we have a race condition here that will make us enter the completion phase more than once.

##########
File path: pulsar-client-cpp/lib/Future.h
##########
@@ -31,121 +32,138 @@ typedef std::unique_lock<std::mutex> Lock;
 namespace pulsar {
 
 template <typename Result, typename Type>
-struct InternalState {
-    std::mutex mutex;
-    std::condition_variable condition;
-    Result result;
-    Type value;
-    bool complete;
-
-    std::list<typename std::function<void(Result, const Type&)> > listeners;
-};
-
-template <typename Result, typename Type>
-class Future {
+class InternalState {
    public:
-    typedef std::function<void(Result, const Type&)> ListenerCallback;
+    using ListenerCallback = std::function<void(Result, const Type&)>;
 
-    Future& addListener(ListenerCallback callback) {
-        InternalState<Result, Type>* state = state_.get();
-        Lock lock(state->mutex);
+    // There's a bug about the defaulted default constructor for GCC < 4.9.1, so we cannot use
+    // `InternalState() = default` here.
+    InternalState() {}
+    InternalState(const InternalState&) = delete;
+    InternalState& operator=(const InternalState&) = delete;
 
-        if (state->complete) {
+    static Result defaultResult() {
+        static Result result;
+        return result;
+    }
+
+    static Type defaultValue() {
+        static Type value;
+        return value;
+    }
+
+    bool completed() const noexcept { return completed_; }
+
+    void addListener(const ListenerCallback& callback) {
+        Lock lock(mutex_);
+        if (completed_) {
+            const auto result = result_;
+            const auto value = value_;
             lock.unlock();
-            callback(state->result, state->value);
+            callback(result, value);
         } else {
-            state->listeners.push_back(callback);
+            listeners_.emplace_back(callback);
         }
+    }
 
-        return *this;
+    Result wait(Type& value) {
+        Lock lock(mutex_);
+        while (!completed_) {
+            condition_.wait(lock);
+        }
+        value = value_;
+        return result_;
     }
 
-    Result get(Type& result) {
-        InternalState<Result, Type>* state = state_.get();
-        Lock lock(state->mutex);
+    bool complete(const Type& value) {
+        if (completed_) {
+            return false;
+        }
+
+        Lock lock(mutex_);
+        value_ = value;
+        completed_ = true;
+        auto listeners = std::move(listeners_);
+        lock.unlock();
 
-        if (!state->complete) {
-            // Wait for result
-            while (!state->complete) {
-                state->condition.wait(lock);
-            }
+        for (auto& callback : listeners) {

Review comment:
       I think we'd still have a problem if listeners are being modified when the future is completed.

##########
File path: pulsar-client-cpp/lib/ClientConnection.cc
##########
@@ -1620,7 +1620,12 @@ Future<Result, MessageId> ClientConnection::newGetLastMessageId(uint64_t consume
 
     pendingGetLastMessageIdRequests_.insert(std::make_pair(requestId, promise));
     lock.unlock();
-    sendRequestWithId(Commands::newGetLastMessageId(consumerId, requestId), requestId);
+    sendRequestWithId(Commands::newGetLastMessageId(consumerId, requestId), requestId)

Review comment:
       I think we should keep this change separated from the below refactoring of the Future class.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on pull request #12586: [C++] Fix request timeout for GetLastMessageId doesn't work

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on pull request #12586:
URL: https://github.com/apache/pulsar/pull/12586#issuecomment-957745057


   I'll fix the compilation error soon.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on pull request #12586: [WIP][C++] Fix request timeout for GetLastMessageId doesn't work

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on pull request #12586:
URL: https://github.com/apache/pulsar/pull/12586#issuecomment-958591750


   Mark it as WIP first. I'll fix the tests in local env before pushing the commits. And I will revert the refactoring of Future.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] merlimat merged pull request #12586: [C++] Fix request timeout for GetLastMessageId doesn't work

Posted by GitBox <gi...@apache.org>.
merlimat merged pull request #12586:
URL: https://github.com/apache/pulsar/pull/12586


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on pull request #12586: [WIP][C++] Fix request timeout for GetLastMessageId doesn't work

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on pull request #12586:
URL: https://github.com/apache/pulsar/pull/12586#issuecomment-958591750


   Mark it as WIP first. I'll fix the tests in local env before pushing the commits. And I will revert the refactoring of Future.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on pull request #12586: [WIP][C++] Fix request timeout for GetLastMessageId doesn't work

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on pull request #12586:
URL: https://github.com/apache/pulsar/pull/12586#issuecomment-958591750


   Mark it as WIP first. I'll fix the tests in local env before pushing the commits. And I will revert the refactoring of Future.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on a change in pull request #12586: [C++] Fix request timeout for GetLastMessageId doesn't work

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on a change in pull request #12586:
URL: https://github.com/apache/pulsar/pull/12586#discussion_r741563569



##########
File path: pulsar-client-cpp/lib/Future.h
##########
@@ -31,121 +32,138 @@ typedef std::unique_lock<std::mutex> Lock;
 namespace pulsar {
 
 template <typename Result, typename Type>
-struct InternalState {
-    std::mutex mutex;
-    std::condition_variable condition;
-    Result result;
-    Type value;
-    bool complete;
-
-    std::list<typename std::function<void(Result, const Type&)> > listeners;
-};
-
-template <typename Result, typename Type>
-class Future {
+class InternalState {
    public:
-    typedef std::function<void(Result, const Type&)> ListenerCallback;
+    using ListenerCallback = std::function<void(Result, const Type&)>;
 
-    Future& addListener(ListenerCallback callback) {
-        InternalState<Result, Type>* state = state_.get();
-        Lock lock(state->mutex);
+    // There's a bug about the defaulted default constructor for GCC < 4.9.1, so we cannot use
+    // `InternalState() = default` here.
+    InternalState() {}
+    InternalState(const InternalState&) = delete;
+    InternalState& operator=(const InternalState&) = delete;
 
-        if (state->complete) {
+    static Result defaultResult() {
+        static Result result;
+        return result;
+    }
+
+    static Type defaultValue() {
+        static Type value;
+        return value;
+    }
+
+    bool completed() const noexcept { return completed_; }
+
+    void addListener(const ListenerCallback& callback) {
+        Lock lock(mutex_);
+        if (completed_) {
+            const auto result = result_;
+            const auto value = value_;
             lock.unlock();
-            callback(state->result, state->value);
+            callback(result, value);
         } else {
-            state->listeners.push_back(callback);
+            listeners_.emplace_back(callback);
         }
+    }
 
-        return *this;
+    Result wait(Type& value) {
+        Lock lock(mutex_);
+        while (!completed_) {
+            condition_.wait(lock);
+        }
+        value = value_;
+        return result_;
     }
 
-    Result get(Type& result) {
-        InternalState<Result, Type>* state = state_.get();
-        Lock lock(state->mutex);
+    bool complete(const Type& value) {
+        if (completed_) {
+            return false;
+        }
+
+        Lock lock(mutex_);
+        value_ = value;
+        completed_ = true;
+        auto listeners = std::move(listeners_);
+        lock.unlock();
 
-        if (!state->complete) {
-            // Wait for result
-            while (!state->complete) {
-                state->condition.wait(lock);
-            }
+        for (auto& callback : listeners) {

Review comment:
        The `listeners` is a local variable and moved from the `listeners_` field. And the move operation is protected by `mutex`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] merlimat commented on a change in pull request #12586: [C++] Fix request timeout for GetLastMessageId doesn't work

Posted by GitBox <gi...@apache.org>.
merlimat commented on a change in pull request #12586:
URL: https://github.com/apache/pulsar/pull/12586#discussion_r741607586



##########
File path: pulsar-client-cpp/lib/Future.h
##########
@@ -31,121 +32,138 @@ typedef std::unique_lock<std::mutex> Lock;
 namespace pulsar {
 
 template <typename Result, typename Type>
-struct InternalState {
-    std::mutex mutex;
-    std::condition_variable condition;
-    Result result;
-    Type value;
-    bool complete;
-
-    std::list<typename std::function<void(Result, const Type&)> > listeners;
-};
-
-template <typename Result, typename Type>
-class Future {
+class InternalState {
    public:
-    typedef std::function<void(Result, const Type&)> ListenerCallback;
+    using ListenerCallback = std::function<void(Result, const Type&)>;
 
-    Future& addListener(ListenerCallback callback) {
-        InternalState<Result, Type>* state = state_.get();
-        Lock lock(state->mutex);
+    // There's a bug about the defaulted default constructor for GCC < 4.9.1, so we cannot use
+    // `InternalState() = default` here.
+    InternalState() {}
+    InternalState(const InternalState&) = delete;
+    InternalState& operator=(const InternalState&) = delete;
 
-        if (state->complete) {
+    static Result defaultResult() {
+        static Result result;
+        return result;
+    }
+
+    static Type defaultValue() {
+        static Type value;
+        return value;
+    }
+
+    bool completed() const noexcept { return completed_; }
+
+    void addListener(const ListenerCallback& callback) {
+        Lock lock(mutex_);
+        if (completed_) {
+            const auto result = result_;
+            const auto value = value_;
             lock.unlock();
-            callback(state->result, state->value);
+            callback(result, value);
         } else {
-            state->listeners.push_back(callback);
+            listeners_.emplace_back(callback);
         }
+    }
 
-        return *this;
+    Result wait(Type& value) {
+        Lock lock(mutex_);
+        while (!completed_) {
+            condition_.wait(lock);
+        }
+        value = value_;
+        return result_;
     }
 
-    Result get(Type& result) {
-        InternalState<Result, Type>* state = state_.get();
-        Lock lock(state->mutex);
+    bool complete(const Type& value) {
+        if (completed_) {
+            return false;
+        }
+
+        Lock lock(mutex_);
+        value_ = value;
+        completed_ = true;
+        auto listeners = std::move(listeners_);
+        lock.unlock();
 
-        if (!state->complete) {
-            // Wait for result
-            while (!state->complete) {
-                state->condition.wait(lock);
-            }
+        for (auto& callback : listeners) {

Review comment:
       Ok, we just need to be careful in making sure that when a listener is added and the future is already completed, the listener needs to be triggered immediately.

##########
File path: pulsar-client-cpp/lib/Future.h
##########
@@ -99,21 +100,24 @@ class Promise {
         }
 
         state->value = value;
-        state->result = Result();
+        state->result = DEFAULT_RESULT;
         state->complete = true;
 
-        typename std::list<ListenerCallback>::iterator it;
-        for (it = state->listeners.begin(); it != state->listeners.end(); ++it) {
-            ListenerCallback& callback = *it;
-            callback(state->result, state->value);
+        const auto listeners = std::move(state->listeners);
+
+        lock.unlock();
+
+        for (auto& callback : listeners) {
+            callback(DEFAULT_RESULT, value);
         }
 
         state->listeners.clear();

Review comment:
       the `state->listeners.clear()` would have to be done from within the mutex. Probably it might not be needed anymore? In what state is the `std::vector` left after the listeners are moved out?

##########
File path: pulsar-client-cpp/lib/Future.h
##########
@@ -99,21 +100,24 @@ class Promise {
         }
 
         state->value = value;
-        state->result = Result();
+        state->result = DEFAULT_RESULT;
         state->complete = true;
 
-        typename std::list<ListenerCallback>::iterator it;
-        for (it = state->listeners.begin(); it != state->listeners.end(); ++it) {
-            ListenerCallback& callback = *it;
-            callback(state->result, state->value);
+        const auto listeners = std::move(state->listeners);
+
+        lock.unlock();
+
+        for (auto& callback : listeners) {
+            callback(DEFAULT_RESULT, value);
         }
 
         state->listeners.clear();

Review comment:
       The only thing I’m not 100 sure if it’s safe to keep using the listener vector after it's moved out. Eg. If a new listener is added after the future  is completed, it will add again into the moved vector which is in an undefined state. A I understand, the only guarantee is that the vector will be empty, but we have no idea whether we can add or not. 
   
   Instead of the move, it might actually be safer to do a swap with an empty std::vector.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on a change in pull request #12586: [C++] Fix request timeout for GetLastMessageId doesn't work

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on a change in pull request #12586:
URL: https://github.com/apache/pulsar/pull/12586#discussion_r741617257



##########
File path: pulsar-client-cpp/lib/Future.h
##########
@@ -31,121 +32,138 @@ typedef std::unique_lock<std::mutex> Lock;
 namespace pulsar {
 
 template <typename Result, typename Type>
-struct InternalState {
-    std::mutex mutex;
-    std::condition_variable condition;
-    Result result;
-    Type value;
-    bool complete;
-
-    std::list<typename std::function<void(Result, const Type&)> > listeners;
-};
-
-template <typename Result, typename Type>
-class Future {
+class InternalState {
    public:
-    typedef std::function<void(Result, const Type&)> ListenerCallback;
+    using ListenerCallback = std::function<void(Result, const Type&)>;
 
-    Future& addListener(ListenerCallback callback) {
-        InternalState<Result, Type>* state = state_.get();
-        Lock lock(state->mutex);
+    // There's a bug about the defaulted default constructor for GCC < 4.9.1, so we cannot use
+    // `InternalState() = default` here.
+    InternalState() {}
+    InternalState(const InternalState&) = delete;
+    InternalState& operator=(const InternalState&) = delete;
 
-        if (state->complete) {
+    static Result defaultResult() {
+        static Result result;
+        return result;
+    }
+
+    static Type defaultValue() {
+        static Type value;
+        return value;
+    }
+
+    bool completed() const noexcept { return completed_; }
+
+    void addListener(const ListenerCallback& callback) {
+        Lock lock(mutex_);
+        if (completed_) {
+            const auto result = result_;
+            const auto value = value_;
             lock.unlock();
-            callback(state->result, state->value);
+            callback(result, value);
         } else {
-            state->listeners.push_back(callback);
+            listeners_.emplace_back(callback);
         }
+    }
 
-        return *this;
+    Result wait(Type& value) {
+        Lock lock(mutex_);
+        while (!completed_) {
+            condition_.wait(lock);
+        }
+        value = value_;
+        return result_;
     }
 
-    Result get(Type& result) {
-        InternalState<Result, Type>* state = state_.get();
-        Lock lock(state->mutex);
+    bool complete(const Type& value) {
+        if (completed_) {
+            return false;
+        }
+
+        Lock lock(mutex_);
+        value_ = value;
+        completed_ = true;
+        auto listeners = std::move(listeners_);
+        lock.unlock();
 
-        if (!state->complete) {
-            // Wait for result
-            while (!state->complete) {
-                state->condition.wait(lock);
-            }
+        for (auto& callback : listeners) {

Review comment:
       Yes. So I just revert the unnecessary changes and only move the iteration of listeners outside the locked block.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] merlimat commented on a change in pull request #12586: [C++] Fix request timeout for GetLastMessageId doesn't work

Posted by GitBox <gi...@apache.org>.
merlimat commented on a change in pull request #12586:
URL: https://github.com/apache/pulsar/pull/12586#discussion_r741626030



##########
File path: pulsar-client-cpp/lib/Future.h
##########
@@ -99,21 +100,24 @@ class Promise {
         }
 
         state->value = value;
-        state->result = Result();
+        state->result = DEFAULT_RESULT;
         state->complete = true;
 
-        typename std::list<ListenerCallback>::iterator it;
-        for (it = state->listeners.begin(); it != state->listeners.end(); ++it) {
-            ListenerCallback& callback = *it;
-            callback(state->result, state->value);
+        const auto listeners = std::move(state->listeners);
+
+        lock.unlock();
+
+        for (auto& callback : listeners) {
+            callback(DEFAULT_RESULT, value);
         }
 
         state->listeners.clear();

Review comment:
       The only thing I’m not 100 sure if it’s safe to keep using the listener vector after it's moved out. Eg. If a new listener is added after the future  is completed, it will add again into the moved vector which is in an undefined state. A I understand, the only guarantee is that the vector will be empty, but we have no idea whether we can add or not. 
   
   Instead of the move, it might actually be safer to do a swap with an empty std::vector.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] merlimat commented on a change in pull request #12586: [C++] Fix request timeout for GetLastMessageId doesn't work

Posted by GitBox <gi...@apache.org>.
merlimat commented on a change in pull request #12586:
URL: https://github.com/apache/pulsar/pull/12586#discussion_r741607586



##########
File path: pulsar-client-cpp/lib/Future.h
##########
@@ -31,121 +32,138 @@ typedef std::unique_lock<std::mutex> Lock;
 namespace pulsar {
 
 template <typename Result, typename Type>
-struct InternalState {
-    std::mutex mutex;
-    std::condition_variable condition;
-    Result result;
-    Type value;
-    bool complete;
-
-    std::list<typename std::function<void(Result, const Type&)> > listeners;
-};
-
-template <typename Result, typename Type>
-class Future {
+class InternalState {
    public:
-    typedef std::function<void(Result, const Type&)> ListenerCallback;
+    using ListenerCallback = std::function<void(Result, const Type&)>;
 
-    Future& addListener(ListenerCallback callback) {
-        InternalState<Result, Type>* state = state_.get();
-        Lock lock(state->mutex);
+    // There's a bug about the defaulted default constructor for GCC < 4.9.1, so we cannot use
+    // `InternalState() = default` here.
+    InternalState() {}
+    InternalState(const InternalState&) = delete;
+    InternalState& operator=(const InternalState&) = delete;
 
-        if (state->complete) {
+    static Result defaultResult() {
+        static Result result;
+        return result;
+    }
+
+    static Type defaultValue() {
+        static Type value;
+        return value;
+    }
+
+    bool completed() const noexcept { return completed_; }
+
+    void addListener(const ListenerCallback& callback) {
+        Lock lock(mutex_);
+        if (completed_) {
+            const auto result = result_;
+            const auto value = value_;
             lock.unlock();
-            callback(state->result, state->value);
+            callback(result, value);
         } else {
-            state->listeners.push_back(callback);
+            listeners_.emplace_back(callback);
         }
+    }
 
-        return *this;
+    Result wait(Type& value) {
+        Lock lock(mutex_);
+        while (!completed_) {
+            condition_.wait(lock);
+        }
+        value = value_;
+        return result_;
     }
 
-    Result get(Type& result) {
-        InternalState<Result, Type>* state = state_.get();
-        Lock lock(state->mutex);
+    bool complete(const Type& value) {
+        if (completed_) {
+            return false;
+        }
+
+        Lock lock(mutex_);
+        value_ = value;
+        completed_ = true;
+        auto listeners = std::move(listeners_);
+        lock.unlock();
 
-        if (!state->complete) {
-            // Wait for result
-            while (!state->complete) {
-                state->condition.wait(lock);
-            }
+        for (auto& callback : listeners) {

Review comment:
       Ok, we just need to be careful in making sure that when a listener is added and the future is already completed, the listener needs to be triggered immediately.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on a change in pull request #12586: [C++] Fix request timeout for GetLastMessageId doesn't work

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on a change in pull request #12586:
URL: https://github.com/apache/pulsar/pull/12586#discussion_r741622135



##########
File path: pulsar-client-cpp/lib/Future.h
##########
@@ -99,21 +100,24 @@ class Promise {
         }
 
         state->value = value;
-        state->result = Result();
+        state->result = DEFAULT_RESULT;
         state->complete = true;
 
-        typename std::list<ListenerCallback>::iterator it;
-        for (it = state->listeners.begin(); it != state->listeners.end(); ++it) {
-            ListenerCallback& callback = *it;
-            callback(state->result, state->value);
+        const auto listeners = std::move(state->listeners);
+
+        lock.unlock();
+
+        for (auto& callback : listeners) {
+            callback(DEFAULT_RESULT, value);
         }
 
         state->listeners.clear();

Review comment:
       It's right. I just forgot to remove it. (I have removed it from `setFailed`)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] merlimat commented on a change in pull request #12586: [C++] Fix request timeout for GetLastMessageId doesn't work

Posted by GitBox <gi...@apache.org>.
merlimat commented on a change in pull request #12586:
URL: https://github.com/apache/pulsar/pull/12586#discussion_r741255837



##########
File path: pulsar-client-cpp/lib/Future.h
##########
@@ -31,121 +32,138 @@ typedef std::unique_lock<std::mutex> Lock;
 namespace pulsar {
 
 template <typename Result, typename Type>
-struct InternalState {
-    std::mutex mutex;
-    std::condition_variable condition;
-    Result result;
-    Type value;
-    bool complete;
-
-    std::list<typename std::function<void(Result, const Type&)> > listeners;
-};
-
-template <typename Result, typename Type>
-class Future {
+class InternalState {
    public:
-    typedef std::function<void(Result, const Type&)> ListenerCallback;
+    using ListenerCallback = std::function<void(Result, const Type&)>;
 
-    Future& addListener(ListenerCallback callback) {
-        InternalState<Result, Type>* state = state_.get();
-        Lock lock(state->mutex);
+    // There's a bug about the defaulted default constructor for GCC < 4.9.1, so we cannot use
+    // `InternalState() = default` here.
+    InternalState() {}
+    InternalState(const InternalState&) = delete;
+    InternalState& operator=(const InternalState&) = delete;
 
-        if (state->complete) {
+    static Result defaultResult() {
+        static Result result;
+        return result;
+    }
+
+    static Type defaultValue() {
+        static Type value;
+        return value;
+    }
+
+    bool completed() const noexcept { return completed_; }
+
+    void addListener(const ListenerCallback& callback) {
+        Lock lock(mutex_);
+        if (completed_) {
+            const auto result = result_;
+            const auto value = value_;
             lock.unlock();
-            callback(state->result, state->value);
+            callback(result, value);
         } else {
-            state->listeners.push_back(callback);
+            listeners_.emplace_back(callback);
         }
+    }
 
-        return *this;
+    Result wait(Type& value) {
+        Lock lock(mutex_);
+        while (!completed_) {
+            condition_.wait(lock);
+        }
+        value = value_;
+        return result_;
     }
 
-    Result get(Type& result) {
-        InternalState<Result, Type>* state = state_.get();
-        Lock lock(state->mutex);
+    bool complete(const Type& value) {
+        if (completed_) {

Review comment:
       If we don't do a compare-and-set operation (or grab the mutex at the beginning), we have a race condition here that will make us enter the completion phase more than once.

##########
File path: pulsar-client-cpp/lib/Future.h
##########
@@ -31,121 +32,138 @@ typedef std::unique_lock<std::mutex> Lock;
 namespace pulsar {
 
 template <typename Result, typename Type>
-struct InternalState {
-    std::mutex mutex;
-    std::condition_variable condition;
-    Result result;
-    Type value;
-    bool complete;
-
-    std::list<typename std::function<void(Result, const Type&)> > listeners;
-};
-
-template <typename Result, typename Type>
-class Future {
+class InternalState {
    public:
-    typedef std::function<void(Result, const Type&)> ListenerCallback;
+    using ListenerCallback = std::function<void(Result, const Type&)>;
 
-    Future& addListener(ListenerCallback callback) {
-        InternalState<Result, Type>* state = state_.get();
-        Lock lock(state->mutex);
+    // There's a bug about the defaulted default constructor for GCC < 4.9.1, so we cannot use
+    // `InternalState() = default` here.
+    InternalState() {}
+    InternalState(const InternalState&) = delete;
+    InternalState& operator=(const InternalState&) = delete;
 
-        if (state->complete) {
+    static Result defaultResult() {
+        static Result result;
+        return result;
+    }
+
+    static Type defaultValue() {
+        static Type value;
+        return value;
+    }
+
+    bool completed() const noexcept { return completed_; }
+
+    void addListener(const ListenerCallback& callback) {
+        Lock lock(mutex_);
+        if (completed_) {
+            const auto result = result_;
+            const auto value = value_;
             lock.unlock();
-            callback(state->result, state->value);
+            callback(result, value);
         } else {
-            state->listeners.push_back(callback);
+            listeners_.emplace_back(callback);
         }
+    }
 
-        return *this;
+    Result wait(Type& value) {
+        Lock lock(mutex_);
+        while (!completed_) {
+            condition_.wait(lock);
+        }
+        value = value_;
+        return result_;
     }
 
-    Result get(Type& result) {
-        InternalState<Result, Type>* state = state_.get();
-        Lock lock(state->mutex);
+    bool complete(const Type& value) {
+        if (completed_) {
+            return false;
+        }
+
+        Lock lock(mutex_);
+        value_ = value;
+        completed_ = true;
+        auto listeners = std::move(listeners_);
+        lock.unlock();
 
-        if (!state->complete) {
-            // Wait for result
-            while (!state->complete) {
-                state->condition.wait(lock);
-            }
+        for (auto& callback : listeners) {

Review comment:
       I think we'd still have a problem if listeners are being modified when the future is completed.

##########
File path: pulsar-client-cpp/lib/ClientConnection.cc
##########
@@ -1620,7 +1620,12 @@ Future<Result, MessageId> ClientConnection::newGetLastMessageId(uint64_t consume
 
     pendingGetLastMessageIdRequests_.insert(std::make_pair(requestId, promise));
     lock.unlock();
-    sendRequestWithId(Commands::newGetLastMessageId(consumerId, requestId), requestId);
+    sendRequestWithId(Commands::newGetLastMessageId(consumerId, requestId), requestId)

Review comment:
       I think we should keep this change separated from the below refactoring of the Future class.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on a change in pull request #12586: [C++] Fix request timeout for GetLastMessageId doesn't work

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on a change in pull request #12586:
URL: https://github.com/apache/pulsar/pull/12586#discussion_r741562539



##########
File path: pulsar-client-cpp/lib/Future.h
##########
@@ -31,121 +32,138 @@ typedef std::unique_lock<std::mutex> Lock;
 namespace pulsar {
 
 template <typename Result, typename Type>
-struct InternalState {
-    std::mutex mutex;
-    std::condition_variable condition;
-    Result result;
-    Type value;
-    bool complete;
-
-    std::list<typename std::function<void(Result, const Type&)> > listeners;
-};
-
-template <typename Result, typename Type>
-class Future {
+class InternalState {
    public:
-    typedef std::function<void(Result, const Type&)> ListenerCallback;
+    using ListenerCallback = std::function<void(Result, const Type&)>;
 
-    Future& addListener(ListenerCallback callback) {
-        InternalState<Result, Type>* state = state_.get();
-        Lock lock(state->mutex);
+    // There's a bug about the defaulted default constructor for GCC < 4.9.1, so we cannot use
+    // `InternalState() = default` here.
+    InternalState() {}
+    InternalState(const InternalState&) = delete;
+    InternalState& operator=(const InternalState&) = delete;
 
-        if (state->complete) {
+    static Result defaultResult() {
+        static Result result;
+        return result;
+    }
+
+    static Type defaultValue() {
+        static Type value;
+        return value;
+    }
+
+    bool completed() const noexcept { return completed_; }
+
+    void addListener(const ListenerCallback& callback) {
+        Lock lock(mutex_);
+        if (completed_) {
+            const auto result = result_;
+            const auto value = value_;
             lock.unlock();
-            callback(state->result, state->value);
+            callback(result, value);
         } else {
-            state->listeners.push_back(callback);
+            listeners_.emplace_back(callback);
         }
+    }
 
-        return *this;
+    Result wait(Type& value) {
+        Lock lock(mutex_);
+        while (!completed_) {
+            condition_.wait(lock);
+        }
+        value = value_;
+        return result_;
     }
 
-    Result get(Type& result) {
-        InternalState<Result, Type>* state = state_.get();
-        Lock lock(state->mutex);
+    bool complete(const Type& value) {
+        if (completed_) {

Review comment:
       Yes, it's right.

##########
File path: pulsar-client-cpp/lib/ClientConnection.cc
##########
@@ -1620,7 +1620,12 @@ Future<Result, MessageId> ClientConnection::newGetLastMessageId(uint64_t consume
 
     pendingGetLastMessageIdRequests_.insert(std::make_pair(requestId, promise));
     lock.unlock();
-    sendRequestWithId(Commands::newGetLastMessageId(consumerId, requestId), requestId);
+    sendRequestWithId(Commands::newGetLastMessageId(consumerId, requestId), requestId)

Review comment:
       I agree. I'll remove the unnecessary refactoring of this class but still add some changes.

##########
File path: pulsar-client-cpp/lib/Future.h
##########
@@ -31,121 +32,138 @@ typedef std::unique_lock<std::mutex> Lock;
 namespace pulsar {
 
 template <typename Result, typename Type>
-struct InternalState {
-    std::mutex mutex;
-    std::condition_variable condition;
-    Result result;
-    Type value;
-    bool complete;
-
-    std::list<typename std::function<void(Result, const Type&)> > listeners;
-};
-
-template <typename Result, typename Type>
-class Future {
+class InternalState {
    public:
-    typedef std::function<void(Result, const Type&)> ListenerCallback;
+    using ListenerCallback = std::function<void(Result, const Type&)>;
 
-    Future& addListener(ListenerCallback callback) {
-        InternalState<Result, Type>* state = state_.get();
-        Lock lock(state->mutex);
+    // There's a bug about the defaulted default constructor for GCC < 4.9.1, so we cannot use
+    // `InternalState() = default` here.
+    InternalState() {}
+    InternalState(const InternalState&) = delete;
+    InternalState& operator=(const InternalState&) = delete;
 
-        if (state->complete) {
+    static Result defaultResult() {
+        static Result result;
+        return result;
+    }
+
+    static Type defaultValue() {
+        static Type value;
+        return value;
+    }
+
+    bool completed() const noexcept { return completed_; }
+
+    void addListener(const ListenerCallback& callback) {
+        Lock lock(mutex_);
+        if (completed_) {
+            const auto result = result_;
+            const auto value = value_;
             lock.unlock();
-            callback(state->result, state->value);
+            callback(result, value);
         } else {
-            state->listeners.push_back(callback);
+            listeners_.emplace_back(callback);
         }
+    }
 
-        return *this;
+    Result wait(Type& value) {
+        Lock lock(mutex_);
+        while (!completed_) {
+            condition_.wait(lock);
+        }
+        value = value_;
+        return result_;
     }
 
-    Result get(Type& result) {
-        InternalState<Result, Type>* state = state_.get();
-        Lock lock(state->mutex);
+    bool complete(const Type& value) {
+        if (completed_) {
+            return false;
+        }
+
+        Lock lock(mutex_);
+        value_ = value;
+        completed_ = true;
+        auto listeners = std::move(listeners_);
+        lock.unlock();
 
-        if (!state->complete) {
-            // Wait for result
-            while (!state->complete) {
-                state->condition.wait(lock);
-            }
+        for (auto& callback : listeners) {

Review comment:
        The `listeners` is a local variable and moved from the `listeners_` field. And the move operation is protected by `mutex`.

##########
File path: pulsar-client-cpp/lib/Future.h
##########
@@ -31,121 +32,138 @@ typedef std::unique_lock<std::mutex> Lock;
 namespace pulsar {
 
 template <typename Result, typename Type>
-struct InternalState {
-    std::mutex mutex;
-    std::condition_variable condition;
-    Result result;
-    Type value;
-    bool complete;
-
-    std::list<typename std::function<void(Result, const Type&)> > listeners;
-};
-
-template <typename Result, typename Type>
-class Future {
+class InternalState {
    public:
-    typedef std::function<void(Result, const Type&)> ListenerCallback;
+    using ListenerCallback = std::function<void(Result, const Type&)>;
 
-    Future& addListener(ListenerCallback callback) {
-        InternalState<Result, Type>* state = state_.get();
-        Lock lock(state->mutex);
+    // There's a bug about the defaulted default constructor for GCC < 4.9.1, so we cannot use
+    // `InternalState() = default` here.
+    InternalState() {}
+    InternalState(const InternalState&) = delete;
+    InternalState& operator=(const InternalState&) = delete;
 
-        if (state->complete) {
+    static Result defaultResult() {
+        static Result result;
+        return result;
+    }
+
+    static Type defaultValue() {
+        static Type value;
+        return value;
+    }
+
+    bool completed() const noexcept { return completed_; }
+
+    void addListener(const ListenerCallback& callback) {
+        Lock lock(mutex_);
+        if (completed_) {
+            const auto result = result_;
+            const auto value = value_;
             lock.unlock();
-            callback(state->result, state->value);
+            callback(result, value);
         } else {
-            state->listeners.push_back(callback);
+            listeners_.emplace_back(callback);
         }
+    }
 
-        return *this;
+    Result wait(Type& value) {
+        Lock lock(mutex_);
+        while (!completed_) {
+            condition_.wait(lock);
+        }
+        value = value_;
+        return result_;
     }
 
-    Result get(Type& result) {
-        InternalState<Result, Type>* state = state_.get();
-        Lock lock(state->mutex);
+    bool complete(const Type& value) {
+        if (completed_) {
+            return false;
+        }
+
+        Lock lock(mutex_);
+        value_ = value;
+        completed_ = true;
+        auto listeners = std::move(listeners_);
+        lock.unlock();
 
-        if (!state->complete) {
-            // Wait for result
-            while (!state->complete) {
-                state->condition.wait(lock);
-            }
+        for (auto& callback : listeners) {

Review comment:
       Yes. So I just revert the unnecessary changes and only move the iteration of listeners outside the locked block.

##########
File path: pulsar-client-cpp/lib/Future.h
##########
@@ -99,21 +100,24 @@ class Promise {
         }
 
         state->value = value;
-        state->result = Result();
+        state->result = DEFAULT_RESULT;
         state->complete = true;
 
-        typename std::list<ListenerCallback>::iterator it;
-        for (it = state->listeners.begin(); it != state->listeners.end(); ++it) {
-            ListenerCallback& callback = *it;
-            callback(state->result, state->value);
+        const auto listeners = std::move(state->listeners);
+
+        lock.unlock();
+
+        for (auto& callback : listeners) {
+            callback(DEFAULT_RESULT, value);
         }
 
         state->listeners.clear();

Review comment:
       It's right. I just forgot to remove it. (I have removed it from `setFailed`)

##########
File path: pulsar-client-cpp/lib/Future.h
##########
@@ -99,21 +100,24 @@ class Promise {
         }
 
         state->value = value;
-        state->result = Result();
+        state->result = DEFAULT_RESULT;
         state->complete = true;
 
-        typename std::list<ListenerCallback>::iterator it;
-        for (it = state->listeners.begin(); it != state->listeners.end(); ++it) {
-            ListenerCallback& callback = *it;
-            callback(state->result, state->value);
+        const auto listeners = std::move(state->listeners);
+
+        lock.unlock();
+
+        for (auto& callback : listeners) {
+            callback(DEFAULT_RESULT, value);
         }
 
         state->listeners.clear();

Review comment:
       Sounds reasonable. Ideally we should not use the object that has been moved. I'll adopt the swap way.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org