You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xy...@apache.org on 2023/03/08 02:44:08 UTC

[pulsar-client-python] branch branch-3.1 updated: Wrap the interruption to a custom exception when a blocking API is interrupted (#99)

This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar-client-python.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new b883f42  Wrap the interruption to a custom exception when a blocking API is interrupted (#99)
b883f42 is described below

commit b883f42aa4287d46423b85f7af77f604cacf2a7e
Author: Yunze Xu <xy...@163.com>
AuthorDate: Wed Mar 8 00:03:10 2023 +0800

    Wrap the interruption to a custom exception when a blocking API is interrupted (#99)
    
    ### Motivation
    
    Currently, when a blocking API is interrupted by a signal, `SystemError`
    will be thrown. However, in this case, `PyErr_SetInterrupt` will be
    called and next time a blocking API is called, `std::system_error` will
    be somehow thrown.
    
    The failure of
    https://lists.apache.org/thread/cmzykd9qz9x1d0s35nc5912o3slwpxpv is
    caused by this issue. The `SystemError` is not called, then
    `client.close()` will be skipped, which leads to the `bad_weak_ptr`
    error.
    
    P.S. Currently we have to call `client.close()` on a `Client` instance,
    otherwise, the `bad_weak_ptr` will be thrown.
    
    However, even if we caught the `SystemError` like:
    
    ```python
        try:
            msg = consumer.receive()
            # ...
        except SystemError:
            break
    ```
    
    we would still see the following error:
    
    ```
    terminate called after throwing an instance of 'std::system_error'
      what():  Operation not permitted
    Aborted
    ```
    
    ### Modifications
    
    - Wrap `ResultInterrupted` into the `pulsar.Interrupted` exception.
    - Refactor the `waitForAsyncValue` and `waitForAsyncResult` functions
      and raise `pulsar.Interrupted` when `PyErr_CheckSignals` detects a
      signal.
    - Add `InterruptedTest` to cover this case.
    - Remove `future.h` since we now use `std::future` instead of the
      manually implemented `Future`.
    - Fix the `examples/consumer.py` to support stopping by Ctrl+C.
    
    (cherry picked from commit ec05f50bf489aef85532d61f577c62649a5b71a6)
---
 examples/consumer.py                              |  10 +-
 pulsar/exceptions.py                              |   2 +-
 src/client.cc                                     |  61 ++------
 src/consumer.cc                                   |  23 +--
 src/future.h                                      | 181 ----------------------
 src/producer.cc                                   |  14 +-
 src/reader.cc                                     |  10 +-
 src/utils.cc                                      |  37 ++---
 src/utils.h                                       |  62 ++------
 examples/consumer.py => tests/interrupted_test.py |  39 +++--
 tests/run-unit-tests.sh                           |   1 +
 11 files changed, 97 insertions(+), 343 deletions(-)

diff --git a/examples/consumer.py b/examples/consumer.py
index 8c2985e..d698f48 100755
--- a/examples/consumer.py
+++ b/examples/consumer.py
@@ -29,8 +29,12 @@ consumer = client.subscribe('my-topic', "my-subscription",
                             })
 
 while True:
-    msg = consumer.receive()
-    print("Received message '{0}' id='{1}'".format(msg.data().decode('utf-8'), msg.message_id()))
-    consumer.acknowledge(msg)
+    try:
+        msg = consumer.receive()
+        print("Received message '{0}' id='{1}'".format(msg.data().decode('utf-8'), msg.message_id()))
+        consumer.acknowledge(msg)
+    except pulsar.Interrupted:
+        print("Stop receiving messages")
+        break
 
 client.close()
diff --git a/pulsar/exceptions.py b/pulsar/exceptions.py
index d151564..1b425c8 100644
--- a/pulsar/exceptions.py
+++ b/pulsar/exceptions.py
@@ -25,4 +25,4 @@ from _pulsar import PulsarException, UnknownError, InvalidConfiguration, Timeout
     ProducerBlockedQuotaExceededException, ProducerQueueIsFull, MessageTooBig, TopicNotFound, SubscriptionNotFound, \
     ConsumerNotFound, UnsupportedVersionError, TopicTerminated, CryptoError, IncompatibleSchema, ConsumerAssignError, \
     CumulativeAcknowledgementNotAllowedError, TransactionCoordinatorNotFoundError, InvalidTxnStatusError, \
-    NotAllowedError, TransactionConflict, TransactionNotFound, ProducerFenced, MemoryBufferIsFull
+    NotAllowedError, TransactionConflict, TransactionNotFound, ProducerFenced, MemoryBufferIsFull, Interrupted
diff --git a/src/client.cc b/src/client.cc
index 206c4e2..0103309 100644
--- a/src/client.cc
+++ b/src/client.cc
@@ -24,73 +24,38 @@
 namespace py = pybind11;
 
 Producer Client_createProducer(Client& client, const std::string& topic, const ProducerConfiguration& conf) {
-    Producer producer;
-
-    waitForAsyncValue(std::function<void(CreateProducerCallback)>([&](CreateProducerCallback callback) {
-                          client.createProducerAsync(topic, conf, callback);
-                      }),
-                      producer);
-
-    return producer;
+    return waitForAsyncValue<Producer>(
+        [&](CreateProducerCallback callback) { client.createProducerAsync(topic, conf, callback); });
 }
 
 Consumer Client_subscribe(Client& client, const std::string& topic, const std::string& subscriptionName,
                           const ConsumerConfiguration& conf) {
-    Consumer consumer;
-
-    waitForAsyncValue(std::function<void(SubscribeCallback)>([&](SubscribeCallback callback) {
-                          client.subscribeAsync(topic, subscriptionName, conf, callback);
-                      }),
-                      consumer);
-
-    return consumer;
+    return waitForAsyncValue<Consumer>(
+        [&](SubscribeCallback callback) { client.subscribeAsync(topic, subscriptionName, conf, callback); });
 }
 
 Consumer Client_subscribe_topics(Client& client, const std::vector<std::string>& topics,
                                  const std::string& subscriptionName, const ConsumerConfiguration& conf) {
-    Consumer consumer;
-
-    waitForAsyncValue(std::function<void(SubscribeCallback)>([&](SubscribeCallback callback) {
-                          client.subscribeAsync(topics, subscriptionName, conf, callback);
-                      }),
-                      consumer);
-
-    return consumer;
+    return waitForAsyncValue<Consumer>(
+        [&](SubscribeCallback callback) { client.subscribeAsync(topics, subscriptionName, conf, callback); });
 }
 
 Consumer Client_subscribe_pattern(Client& client, const std::string& topic_pattern,
                                   const std::string& subscriptionName, const ConsumerConfiguration& conf) {
-    Consumer consumer;
-
-    waitForAsyncValue(std::function<void(SubscribeCallback)>([&](SubscribeCallback callback) {
-                          client.subscribeWithRegexAsync(topic_pattern, subscriptionName, conf, callback);
-                      }),
-                      consumer);
-
-    return consumer;
+    return waitForAsyncValue<Consumer>([&](SubscribeCallback callback) {
+        client.subscribeWithRegexAsync(topic_pattern, subscriptionName, conf, callback);
+    });
 }
 
 Reader Client_createReader(Client& client, const std::string& topic, const MessageId& startMessageId,
                            const ReaderConfiguration& conf) {
-    Reader reader;
-
-    waitForAsyncValue(std::function<void(ReaderCallback)>([&](ReaderCallback callback) {
-                          client.createReaderAsync(topic, startMessageId, conf, callback);
-                      }),
-                      reader);
-
-    return reader;
+    return waitForAsyncValue<Reader>(
+        [&](ReaderCallback callback) { client.createReaderAsync(topic, startMessageId, conf, callback); });
 }
 
 std::vector<std::string> Client_getTopicPartitions(Client& client, const std::string& topic) {
-    std::vector<std::string> partitions;
-
-    waitForAsyncValue(std::function<void(GetPartitionsCallback)>([&](GetPartitionsCallback callback) {
-                          client.getPartitionsForTopicAsync(topic, callback);
-                      }),
-                      partitions);
-
-    return partitions;
+    return waitForAsyncValue<std::vector<std::string>>(
+        [&](GetPartitionsCallback callback) { client.getPartitionsForTopicAsync(topic, callback); });
 }
 
 void Client_close(Client& client) {
diff --git a/src/consumer.cc b/src/consumer.cc
index 972bd0b..4b44775 100644
--- a/src/consumer.cc
+++ b/src/consumer.cc
@@ -29,13 +29,7 @@ void Consumer_unsubscribe(Consumer& consumer) {
 }
 
 Message Consumer_receive(Consumer& consumer) {
-    Message msg;
-
-    waitForAsyncValue(std::function<void(ReceiveCallback)>(
-                          [&consumer](ReceiveCallback callback) { consumer.receiveAsync(callback); }),
-                      msg);
-
-    return msg;
+    return waitForAsyncValue<Message>([&](ReceiveCallback callback) { consumer.receiveAsync(callback); });
 }
 
 Message Consumer_receive_timeout(Consumer& consumer, int timeoutMs) {
@@ -59,32 +53,27 @@ Messages Consumer_batch_receive(Consumer& consumer) {
 void Consumer_acknowledge(Consumer& consumer, const Message& msg) { consumer.acknowledgeAsync(msg, nullptr); }
 
 void Consumer_acknowledge_message_id(Consumer& consumer, const MessageId& msgId) {
-    Py_BEGIN_ALLOW_THREADS
-    consumer.acknowledgeAsync(msgId, nullptr);
+    Py_BEGIN_ALLOW_THREADS consumer.acknowledgeAsync(msgId, nullptr);
     Py_END_ALLOW_THREADS
 }
 
 void Consumer_negative_acknowledge(Consumer& consumer, const Message& msg) {
-    Py_BEGIN_ALLOW_THREADS
-    consumer.negativeAcknowledge(msg);
+    Py_BEGIN_ALLOW_THREADS consumer.negativeAcknowledge(msg);
     Py_END_ALLOW_THREADS
 }
 
 void Consumer_negative_acknowledge_message_id(Consumer& consumer, const MessageId& msgId) {
-    Py_BEGIN_ALLOW_THREADS
-    consumer.negativeAcknowledge(msgId);
+    Py_BEGIN_ALLOW_THREADS consumer.negativeAcknowledge(msgId);
     Py_END_ALLOW_THREADS
 }
 
 void Consumer_acknowledge_cumulative(Consumer& consumer, const Message& msg) {
-    Py_BEGIN_ALLOW_THREADS
-    consumer.acknowledgeCumulativeAsync(msg, nullptr);
+    Py_BEGIN_ALLOW_THREADS consumer.acknowledgeCumulativeAsync(msg, nullptr);
     Py_END_ALLOW_THREADS
 }
 
 void Consumer_acknowledge_cumulative_message_id(Consumer& consumer, const MessageId& msgId) {
-    Py_BEGIN_ALLOW_THREADS
-    consumer.acknowledgeCumulativeAsync(msgId, nullptr);
+    Py_BEGIN_ALLOW_THREADS consumer.acknowledgeCumulativeAsync(msgId, nullptr);
     Py_END_ALLOW_THREADS
 }
 
diff --git a/src/future.h b/src/future.h
deleted file mode 100644
index 6754c89..0000000
--- a/src/future.h
+++ /dev/null
@@ -1,181 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-#ifndef LIB_FUTURE_H_
-#define LIB_FUTURE_H_
-
-#include <functional>
-#include <mutex>
-#include <memory>
-#include <condition_variable>
-
-#include <list>
-
-typedef std::unique_lock<std::mutex> Lock;
-
-namespace pulsar {
-
-template <typename Result, typename Type>
-struct InternalState {
-    std::mutex mutex;
-    std::condition_variable condition;
-    Result result;
-    Type value;
-    bool complete;
-
-    std::list<typename std::function<void(Result, const Type&)> > listeners;
-};
-
-template <typename Result, typename Type>
-class Future {
-   public:
-    typedef std::function<void(Result, const Type&)> ListenerCallback;
-
-    Future& addListener(ListenerCallback callback) {
-        InternalState<Result, Type>* state = state_.get();
-        Lock lock(state->mutex);
-
-        if (state->complete) {
-            lock.unlock();
-            callback(state->result, state->value);
-        } else {
-            state->listeners.push_back(callback);
-        }
-
-        return *this;
-    }
-
-    Result get(Type& result) {
-        InternalState<Result, Type>* state = state_.get();
-        Lock lock(state->mutex);
-
-        if (!state->complete) {
-            // Wait for result
-            while (!state->complete) {
-                state->condition.wait(lock);
-            }
-        }
-
-        result = state->value;
-        return state->result;
-    }
-
-    template <typename Duration>
-    bool get(Result& res, Type& value, Duration d) {
-        InternalState<Result, Type>* state = state_.get();
-        Lock lock(state->mutex);
-
-        if (!state->complete) {
-            // Wait for result
-            while (!state->complete) {
-                if (!state->condition.wait_for(lock, d, [&state] { return state->complete; })) {
-                    // Timeout while waiting for the future to complete
-                    return false;
-                }
-            }
-        }
-
-        value = state->value;
-        res = state->result;
-        return true;
-    }
-
-   private:
-    typedef std::shared_ptr<InternalState<Result, Type> > InternalStatePtr;
-    Future(InternalStatePtr state) : state_(state) {}
-
-    std::shared_ptr<InternalState<Result, Type> > state_;
-
-    template <typename U, typename V>
-    friend class Promise;
-};
-
-template <typename Result, typename Type>
-class Promise {
-   public:
-    Promise() : state_(std::make_shared<InternalState<Result, Type> >()) {}
-
-    bool setValue(const Type& value) const {
-        static Result DEFAULT_RESULT;
-        InternalState<Result, Type>* state = state_.get();
-        Lock lock(state->mutex);
-
-        if (state->complete) {
-            return false;
-        }
-
-        state->value = value;
-        state->result = DEFAULT_RESULT;
-        state->complete = true;
-
-        decltype(state->listeners) listeners;
-        listeners.swap(state->listeners);
-
-        lock.unlock();
-
-        for (auto& callback : listeners) {
-            callback(DEFAULT_RESULT, value);
-        }
-
-        state->condition.notify_all();
-        return true;
-    }
-
-    bool setFailed(Result result) const {
-        static Type DEFAULT_VALUE;
-        InternalState<Result, Type>* state = state_.get();
-        Lock lock(state->mutex);
-
-        if (state->complete) {
-            return false;
-        }
-
-        state->result = result;
-        state->complete = true;
-
-        decltype(state->listeners) listeners;
-        listeners.swap(state->listeners);
-
-        lock.unlock();
-
-        for (auto& callback : listeners) {
-            callback(result, DEFAULT_VALUE);
-        }
-
-        state->condition.notify_all();
-        return true;
-    }
-
-    bool isComplete() const {
-        InternalState<Result, Type>* state = state_.get();
-        Lock lock(state->mutex);
-        return state->complete;
-    }
-
-    Future<Result, Type> getFuture() const { return Future<Result, Type>(state_); }
-
-   private:
-    typedef std::function<void(Result, const Type&)> ListenerCallback;
-    std::shared_ptr<InternalState<Result, Type> > state_;
-};
-
-class Void {};
-
-} /* namespace pulsar */
-
-#endif /* LIB_FUTURE_H_ */
diff --git a/src/producer.cc b/src/producer.cc
index 1dd5a76..7027185 100644
--- a/src/producer.cc
+++ b/src/producer.cc
@@ -25,21 +25,15 @@
 namespace py = pybind11;
 
 MessageId Producer_send(Producer& producer, const Message& message) {
-    MessageId messageId;
-
-    waitForAsyncValue(std::function<void(SendCallback)>(
-                          [&](SendCallback callback) { producer.sendAsync(message, callback); }),
-                      messageId);
-
-    return messageId;
+    return waitForAsyncValue<MessageId>(
+        [&](SendCallback callback) { producer.sendAsync(message, callback); });
 }
 
 void Producer_sendAsync(Producer& producer, const Message& msg, SendCallback callback) {
-    Py_BEGIN_ALLOW_THREADS
-    producer.sendAsync(msg, callback);
+    Py_BEGIN_ALLOW_THREADS producer.sendAsync(msg, callback);
     Py_END_ALLOW_THREADS
 
-    if (PyErr_CheckSignals() == -1) {
+        if (PyErr_CheckSignals() == -1) {
         PyErr_SetInterrupt();
     }
 }
diff --git a/src/reader.cc b/src/reader.cc
index 7194c29..0126f3f 100644
--- a/src/reader.cc
+++ b/src/reader.cc
@@ -62,14 +62,8 @@ Message Reader_readNextTimeout(Reader& reader, int timeoutMs) {
 }
 
 bool Reader_hasMessageAvailable(Reader& reader) {
-    bool available = false;
-
-    waitForAsyncValue(
-        std::function<void(HasMessageAvailableCallback)>(
-            [&](HasMessageAvailableCallback callback) { reader.hasMessageAvailableAsync(callback); }),
-        available);
-
-    return available;
+    return waitForAsyncValue<bool>(
+        [&](HasMessageAvailableCallback callback) { reader.hasMessageAvailableAsync(callback); });
 }
 
 void Reader_close(Reader& reader) {
diff --git a/src/utils.cc b/src/utils.cc
index cf8f6f4..8ebc3f9 100644
--- a/src/utils.cc
+++ b/src/utils.cc
@@ -20,28 +20,29 @@
 #include "utils.h"
 
 void waitForAsyncResult(std::function<void(ResultCallback)> func) {
-    Result res = ResultOk;
-    bool b;
-    Promise<bool, Result> promise;
-    Future<bool, Result> future = promise.getFuture();
+    auto promise = std::make_shared<std::promise<Result>>();
+    func([promise](Result result) { promise->set_value(result); });
+    internal::waitForResult(*promise);
+}
 
-    Py_BEGIN_ALLOW_THREADS func(WaitForCallback(promise));
-    Py_END_ALLOW_THREADS
+namespace internal {
 
-        bool isComplete;
+void waitForResult(std::promise<pulsar::Result>& promise) {
+    auto future = promise.get_future();
     while (true) {
-        // Check periodically for Python signals
-        Py_BEGIN_ALLOW_THREADS isComplete = future.get(b, std::ref(res), std::chrono::milliseconds(100));
-        Py_END_ALLOW_THREADS
-
-            if (isComplete) {
-            CHECK_RESULT(res);
-            return;
+        {
+            py::gil_scoped_release release;
+            auto status = future.wait_for(std::chrono::milliseconds(100));
+            if (status == std::future_status::ready) {
+                CHECK_RESULT(future.get());
+                return;
+            }
         }
-
-        if (PyErr_CheckSignals() == -1) {
-            PyErr_SetInterrupt();
-            return;
+        py::gil_scoped_acquire acquire;
+        if (PyErr_CheckSignals() != 0) {
+            raiseException(ResultInterrupted);
         }
     }
 }
+
+}  // namespace internal
diff --git a/src/utils.h b/src/utils.h
index fb700c6..bbe202e 100644
--- a/src/utils.h
+++ b/src/utils.h
@@ -21,12 +21,14 @@
 
 #include <pulsar/Client.h>
 #include <pulsar/MessageBatch.h>
+#include <chrono>
 #include <exception>
-#include <Python.h>
+#include <future>
+#include <pybind11/pybind11.h>
 #include "exceptions.h"
-#include "future.h"
 
 using namespace pulsar;
+namespace py = pybind11;
 
 inline void CHECK_RESULT(Result res) {
     if (res != ResultOk) {
@@ -34,56 +36,26 @@ inline void CHECK_RESULT(Result res) {
     }
 }
 
-struct WaitForCallback {
-    Promise<bool, Result> m_promise;
+namespace internal {
 
-    WaitForCallback(Promise<bool, Result> promise) : m_promise(promise) {}
+void waitForResult(std::promise<pulsar::Result>& promise);
 
-    void operator()(Result result) { m_promise.setValue(result); }
-};
-
-template <typename T>
-struct WaitForCallbackValue {
-    Promise<Result, T>& m_promise;
-
-    WaitForCallbackValue(Promise<Result, T>& promise) : m_promise(promise) {}
-
-    void operator()(Result result, const T& value) {
-        if (result == ResultOk) {
-            m_promise.setValue(value);
-        } else {
-            m_promise.setFailed(result);
-        }
-    }
-};
+}  // namespace internal
 
 void waitForAsyncResult(std::function<void(ResultCallback)> func);
 
-template <typename T, typename Callback>
-inline void waitForAsyncValue(std::function<void(Callback)> func, T& value) {
-    Result res = ResultOk;
-    Promise<Result, T> promise;
-    Future<Result, T> future = promise.getFuture();
-
-    Py_BEGIN_ALLOW_THREADS func(WaitForCallbackValue<T>(promise));
-    Py_END_ALLOW_THREADS
+template <typename T>
+inline T waitForAsyncValue(std::function<void(std::function<void(Result, const T&)>)> func) {
+    auto resultPromise = std::make_shared<std::promise<Result>>();
+    auto valuePromise = std::make_shared<std::promise<T>>();
 
-        bool isComplete;
-    while (true) {
-        // Check periodically for Python signals
-        Py_BEGIN_ALLOW_THREADS isComplete = future.get(res, std::ref(value), std::chrono::milliseconds(100));
-        Py_END_ALLOW_THREADS
+    func([resultPromise, valuePromise](Result result, const T& value) {
+        valuePromise->set_value(value);
+        resultPromise->set_value(result);
+    });
 
-            if (isComplete) {
-            CHECK_RESULT(res);
-            return;
-        }
-
-        if (PyErr_CheckSignals() == -1) {
-            PyErr_SetInterrupt();
-            return;
-        }
-    }
+    internal::waitForResult(*resultPromise);
+    return valuePromise->get_future().get();
 }
 
 struct CryptoKeyReaderWrapper {
diff --git a/examples/consumer.py b/tests/interrupted_test.py
old mode 100755
new mode 100644
similarity index 50%
copy from examples/consumer.py
copy to tests/interrupted_test.py
index 8c2985e..6d61f99
--- a/examples/consumer.py
+++ b/tests/interrupted_test.py
@@ -18,19 +18,34 @@
 # under the License.
 #
 
-
+from unittest import TestCase, main
 import pulsar
+import signal
+import time
+import threading
+
+class InterruptedTest(TestCase):
+
+    service_url = 'pulsar://localhost:6650'
 
-client = pulsar.Client('pulsar://localhost:6650')
-consumer = client.subscribe('my-topic', "my-subscription",
-                            properties={
-                                "consumer-name": "test-consumer-name",
-                                "consumer-id": "test-consumer-id"
-                            })
+    def test_sigint(self):
+        def thread_function():
+            time.sleep(1)
+            signal.raise_signal(signal.SIGINT)
 
-while True:
-    msg = consumer.receive()
-    print("Received message '{0}' id='{1}'".format(msg.data().decode('utf-8'), msg.message_id()))
-    consumer.acknowledge(msg)
+        client = pulsar.Client(self.service_url)
+        consumer = client.subscribe('test-sigint', "my-sub")
+        thread = threading.Thread(target=thread_function)
+        thread.start()
+        
+        start = time.time()
+        with self.assertRaises(pulsar.Interrupted):
+            consumer.receive()
+        finish = time.time()
+        print(f"time: {finish - start}")
+        self.assertGreater(finish - start, 1)
+        self.assertLess(finish - start, 1.5)
+        client.close()
 
-client.close()
+if __name__ == '__main__':
+    main()
diff --git a/tests/run-unit-tests.sh b/tests/run-unit-tests.sh
index 13349f9..5168f94 100755
--- a/tests/run-unit-tests.sh
+++ b/tests/run-unit-tests.sh
@@ -24,4 +24,5 @@ ROOT_DIR=$(git rev-parse --show-toplevel)
 cd $ROOT_DIR/tests
 
 python3 custom_logger_test.py
+python3 interrupted_test.py
 python3 pulsar_test.py