You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/09/02 03:05:26 UTC
[pulsar] 01/05: [python-client] Fixed crash when using Python
logger (#10981)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 875bfdfc879bdef09be3c6cebeaa43fdce192ebb
Author: Livio BenĨik <lb...@gmail.com>
AuthorDate: Wed Jul 14 09:02:34 2021 +0200
[python-client] Fixed crash when using Python logger (#10981)
### Motivation
In some cases, the Python client would crash when using the new `logger` option. This happens when a Pulsar message is sent asynchronously, but soon after the program exits (and even then, not always).
For example, when doing Django migrations which include sending a message:
```
...
[2021-06-19 06:53:57.691] [INFO]: Created connection for pulsar://localhost:6650
[2021-06-19 06:53:57.693] [INFO]: [127.0.0.1:36536 -> 127.0.0.1:6650] Connected to broker
[2021-06-19 06:53:57.695] [INFO]: [persistent://public/default/dashboard-global_context-emit, ] Getting connection from pool
[2021-06-19 06:53:57.707] [INFO]: [persistent://public/default/dashboard-global_context-emit, ] Created producer on broker [127.0.0.1:36536 -> 127.0.0.1:6650]
...
[2021-06-19 06:53:57.728] [DEBUG]: Sending message to topic .....
Applying dashboard.0001_initial... OK
Applying templating.0001_initial... OK
Error in sys.excepthook:
Original exception was:
Failed to migrate dashboard! Return code was: -6
```
This happens because Pulsar tries to log messages after Python already started finalizing, so the client can't get a GIL lock, which crashes the whole client.
### Modifications
Following the instructions at https://docs.python.org/3/c-api/init.html#c.PyGILState_Ensure, I added a check for when Python is finalizing, and if it is, we fallback to the default console logger (the log level is still respected correctly).
Now it looks like this:
```
...
[2021-06-19 06:45:15.561] [INFO]: Created connection for pulsar://localhost:6650
[2021-06-19 06:45:15.563] [INFO]: [127.0.0.1:35930 -> 127.0.0.1:6650] Connected to broker
[2021-06-19 06:45:15.568] [INFO]: [persistent://public/default/dashboard-global_context-emit, ] Getting connection from pool
[2021-06-19 06:45:15.586] [INFO]: [persistent://public/default/zaba-dashboard-global_context-emit, ] Created producer on broker [127.0.0.1:35930 -> 127.0.0.1:6650]
...
[2021-06-19 06:45:15.604] [DEBUG]: Sending message to topic .....
Applying dashboard.0001_initial... OK
Applying templating.0001_initial... OK
2021-06-19 06:45:16.200 INFO [139853253269312] ClientConnection:1446 | [127.0.0.1:35930 -> 127.0.0.1:6650] Connection closed
2021-06-19 06:45:16.200 ERROR [139853099652672] ClientConnection:531 | [127.0.0.1:35930 -> 127.0.0.1:6650] Read failed: Operation canceled
2021-06-19 06:45:16.201 INFO [139853253269312] ClientConnection:261 | [127.0.0.1:35930 -> 127.0.0.1:6650] Destroyed connection
2021-06-19 06:45:16.201 INFO [139853253269312] ProducerImpl:561 | Producer - [persistent://public/default/dashboard-global_context-emit, standalone-0-120] , [batchMessageContainer = { BatchMessageContainer [size = 0] [bytes = 0] [maxSize = 1000] [maxBytes = 131072] [topicName = persistent://public/default/dashboard-global_context-emit] [numberOfBatchesSent_ = 1] [averageBatchSize_ = 1] }]
Successfully migrated dashboard
```
(cherry picked from commit fc8ce64b1328945ab8e06aad56151294295f003a)
---
pulsar-client-cpp/python/src/config.cc | 65 +++++++++++++++++-----------------
1 file changed, 32 insertions(+), 33 deletions(-)
diff --git a/pulsar-client-cpp/python/src/config.cc b/pulsar-client-cpp/python/src/config.cc
index b665ec7..0b30713 100644
--- a/pulsar-client-cpp/python/src/config.cc
+++ b/pulsar-client-cpp/python/src/config.cc
@@ -17,6 +17,7 @@
* under the License.
*/
#include "utils.h"
+#include <pulsar/ConsoleLoggerFactory.h>
template<typename T>
struct ListenerWrapper {
@@ -90,6 +91,7 @@ static ProducerConfiguration& ProducerConfiguration_setCryptoKeyReader(ProducerC
class LoggerWrapper: public Logger {
PyObject* _pyLogger;
+ Logger* fallbackLogger;
int _currentPythonLogLevel = _getLogLevelValue(Logger::LEVEL_INFO);
void _updateCurrentPythonLogLevel() {
@@ -110,26 +112,19 @@ class LoggerWrapper: public Logger {
public:
- LoggerWrapper(const std::string &logger, PyObject* pyLogger) {
+ LoggerWrapper(const std::string &filename, PyObject* pyLogger) {
_pyLogger = pyLogger;
Py_XINCREF(_pyLogger);
- _updateCurrentPythonLogLevel();
- }
-
- LoggerWrapper(const LoggerWrapper& other) {
- _pyLogger = other._pyLogger;
- Py_XINCREF(_pyLogger);
- }
+ std::unique_ptr<LoggerFactory> factory(new ConsoleLoggerFactory());
+ fallbackLogger = factory->getLogger(filename);
- LoggerWrapper& operator=(const LoggerWrapper& other) {
- _pyLogger = other._pyLogger;
- Py_XINCREF(_pyLogger);
- return *this;
+ _updateCurrentPythonLogLevel();
}
virtual ~LoggerWrapper() {
Py_XDECREF(_pyLogger);
+ delete fallbackLogger;
}
bool isEnabled(Level level) {
@@ -137,34 +132,38 @@ class LoggerWrapper: public Logger {
}
void log(Level level, int line, const std::string& message) {
- PyGILState_STATE state = PyGILState_Ensure();
-
- try {
- switch (level) {
- case Logger::LEVEL_DEBUG:
- py::call_method<void>(_pyLogger, "debug", message.c_str());
- break;
- case Logger::LEVEL_INFO:
- py::call_method<void>(_pyLogger, "info", message.c_str());
- break;
- case Logger::LEVEL_WARN:
- py::call_method<void>(_pyLogger, "warning", message.c_str());
- break;
- case Logger::LEVEL_ERROR:
- py::call_method<void>(_pyLogger, "error", message.c_str());
- break;
+ if (Py_IsInitialized() != true) {
+ // Python logger is unavailable - fallback to console logger
+ fallbackLogger->log(level, line, message);
+ } else {
+ PyGILState_STATE state = PyGILState_Ensure();
+
+ try {
+ switch (level) {
+ case Logger::LEVEL_DEBUG:
+ py::call_method<void>(_pyLogger, "debug", message.c_str());
+ break;
+ case Logger::LEVEL_INFO:
+ py::call_method<void>(_pyLogger, "info", message.c_str());
+ break;
+ case Logger::LEVEL_WARN:
+ py::call_method<void>(_pyLogger, "warning", message.c_str());
+ break;
+ case Logger::LEVEL_ERROR:
+ py::call_method<void>(_pyLogger, "error", message.c_str());
+ break;
+ }
+
+ } catch (py::error_already_set e) {
+ PyErr_Print();
}
- } catch (py::error_already_set e) {
- PyErr_Print();
+ PyGILState_Release(state);
}
-
- PyGILState_Release(state);
}
};
class LoggerWrapperFactory : public LoggerFactory {
- static LoggerWrapperFactory* _instance;
PyObject* _pyLogger;
public: