You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/09/02 03:05:26 UTC

[pulsar] 01/05: [python-client] Fixed crash when using Python logger (#10981)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 875bfdfc879bdef09be3c6cebeaa43fdce192ebb
Author: Livio BenĨik <lb...@gmail.com>
AuthorDate: Wed Jul 14 09:02:34 2021 +0200

    [python-client] Fixed crash when using Python logger (#10981)
    
    ### Motivation
    
    In some cases, the Python client would crash when using the new `logger` option. This happens when a Pulsar message is sent asynchronously, but soon after the program exits (and even then, not always).
    
    For example, when doing Django migrations which include sending a message:
    ```
    ...
    [2021-06-19 06:53:57.691] [INFO]: Created connection for pulsar://localhost:6650
    [2021-06-19 06:53:57.693] [INFO]: [127.0.0.1:36536 -> 127.0.0.1:6650] Connected to broker
    [2021-06-19 06:53:57.695] [INFO]: [persistent://public/default/dashboard-global_context-emit, ] Getting connection from pool
    [2021-06-19 06:53:57.707] [INFO]: [persistent://public/default/dashboard-global_context-emit, ] Created producer on broker [127.0.0.1:36536 -> 127.0.0.1:6650]
    ...
    [2021-06-19 06:53:57.728] [DEBUG]: Sending message to topic .....
      Applying dashboard.0001_initial... OK
      Applying templating.0001_initial... OK
    Error in sys.excepthook:
    
    Original exception was:
    Failed to migrate dashboard! Return code was: -6
    ```
    
    This happens because Pulsar tries to log messages after Python already started finalizing, so the client can't get a GIL lock, which crashes the whole client.
    
    ### Modifications
    
    Following the instructions at https://docs.python.org/3/c-api/init.html#c.PyGILState_Ensure, I added a check for when Python is finalizing, and if it is, we fallback to the default console logger (the log level is still respected correctly).
    
    Now it looks like this:
    ```
    ...
    [2021-06-19 06:45:15.561] [INFO]: Created connection for pulsar://localhost:6650
    [2021-06-19 06:45:15.563] [INFO]: [127.0.0.1:35930 -> 127.0.0.1:6650] Connected to broker
    [2021-06-19 06:45:15.568] [INFO]: [persistent://public/default/dashboard-global_context-emit, ] Getting connection from pool
    [2021-06-19 06:45:15.586] [INFO]: [persistent://public/default/zaba-dashboard-global_context-emit, ] Created producer on broker [127.0.0.1:35930 -> 127.0.0.1:6650]
    ...
    [2021-06-19 06:45:15.604] [DEBUG]: Sending message to topic .....
      Applying dashboard.0001_initial... OK
      Applying templating.0001_initial... OK
    2021-06-19 06:45:16.200 INFO  [139853253269312] ClientConnection:1446 | [127.0.0.1:35930 -> 127.0.0.1:6650] Connection closed
    2021-06-19 06:45:16.200 ERROR [139853099652672] ClientConnection:531 | [127.0.0.1:35930 -> 127.0.0.1:6650] Read failed: Operation canceled
    2021-06-19 06:45:16.201 INFO  [139853253269312] ClientConnection:261 | [127.0.0.1:35930 -> 127.0.0.1:6650] Destroyed connection
    2021-06-19 06:45:16.201 INFO  [139853253269312] ProducerImpl:561 | Producer - [persistent://public/default/dashboard-global_context-emit, standalone-0-120] , [batchMessageContainer = { BatchMessageContainer [size = 0] [bytes = 0] [maxSize = 1000] [maxBytes = 131072] [topicName = persistent://public/default/dashboard-global_context-emit] [numberOfBatchesSent_ = 1] [averageBatchSize_ = 1] }]
    Successfully migrated dashboard
    ```
    
    (cherry picked from commit fc8ce64b1328945ab8e06aad56151294295f003a)
---
 pulsar-client-cpp/python/src/config.cc | 65 +++++++++++++++++-----------------
 1 file changed, 32 insertions(+), 33 deletions(-)

diff --git a/pulsar-client-cpp/python/src/config.cc b/pulsar-client-cpp/python/src/config.cc
index b665ec7..0b30713 100644
--- a/pulsar-client-cpp/python/src/config.cc
+++ b/pulsar-client-cpp/python/src/config.cc
@@ -17,6 +17,7 @@
  * under the License.
  */
 #include "utils.h"
+#include <pulsar/ConsoleLoggerFactory.h>
 
 template<typename T>
 struct ListenerWrapper {
@@ -90,6 +91,7 @@ static ProducerConfiguration& ProducerConfiguration_setCryptoKeyReader(ProducerC
 
 class LoggerWrapper: public Logger {
     PyObject* _pyLogger;
+    Logger* fallbackLogger;
     int _currentPythonLogLevel = _getLogLevelValue(Logger::LEVEL_INFO);
 
     void _updateCurrentPythonLogLevel() {
@@ -110,26 +112,19 @@ class LoggerWrapper: public Logger {
 
    public:
 
-    LoggerWrapper(const std::string &logger, PyObject* pyLogger) {
+    LoggerWrapper(const std::string &filename, PyObject* pyLogger) {
         _pyLogger = pyLogger;
         Py_XINCREF(_pyLogger);
 
-        _updateCurrentPythonLogLevel();
-    }
-
-    LoggerWrapper(const LoggerWrapper& other) {
-        _pyLogger = other._pyLogger;
-        Py_XINCREF(_pyLogger);
-    }
+        std::unique_ptr<LoggerFactory> factory(new ConsoleLoggerFactory());
+        fallbackLogger = factory->getLogger(filename);
 
-    LoggerWrapper& operator=(const LoggerWrapper& other) {
-        _pyLogger = other._pyLogger;
-        Py_XINCREF(_pyLogger);
-        return *this;
+        _updateCurrentPythonLogLevel();
     }
 
     virtual ~LoggerWrapper() {
         Py_XDECREF(_pyLogger);
+        delete fallbackLogger;
     }
 
     bool isEnabled(Level level) {
@@ -137,34 +132,38 @@ class LoggerWrapper: public Logger {
     }
 
     void log(Level level, int line, const std::string& message) {
-        PyGILState_STATE state = PyGILState_Ensure();
-
-        try {
-            switch (level) {
-                case Logger::LEVEL_DEBUG:
-                    py::call_method<void>(_pyLogger, "debug", message.c_str());
-                    break;
-                case Logger::LEVEL_INFO:
-                    py::call_method<void>(_pyLogger, "info", message.c_str());
-                    break;
-                case Logger::LEVEL_WARN:
-                    py::call_method<void>(_pyLogger, "warning", message.c_str());
-                    break;
-                case Logger::LEVEL_ERROR:
-                    py::call_method<void>(_pyLogger, "error", message.c_str());
-                    break;
+        if (Py_IsInitialized() != true) {
+            // Python logger is unavailable - fallback to console logger
+            fallbackLogger->log(level, line, message);
+        } else {
+            PyGILState_STATE state = PyGILState_Ensure();
+
+            try {
+                switch (level) {
+                    case Logger::LEVEL_DEBUG:
+                        py::call_method<void>(_pyLogger, "debug", message.c_str());
+                        break;
+                    case Logger::LEVEL_INFO:
+                        py::call_method<void>(_pyLogger, "info", message.c_str());
+                        break;
+                    case Logger::LEVEL_WARN:
+                        py::call_method<void>(_pyLogger, "warning", message.c_str());
+                        break;
+                    case Logger::LEVEL_ERROR:
+                        py::call_method<void>(_pyLogger, "error", message.c_str());
+                        break;
+                }
+
+            } catch (py::error_already_set e) {
+                PyErr_Print();
             }
 
-        } catch (py::error_already_set e) {
-            PyErr_Print();
+            PyGILState_Release(state);
         }
-
-        PyGILState_Release(state);
     }
 };
 
 class LoggerWrapperFactory : public LoggerFactory {
-    static LoggerWrapperFactory* _instance;
     PyObject* _pyLogger;
 
    public: