You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/09/09 08:00:54 UTC

[pulsar] 02/09: [Python] Handle py::call_method error without mutating internal state (#11840)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 3129c9634eb2f3c155ae46170ab16710912288ea
Author: Yunze Xu <xy...@163.com>
AuthorDate: Wed Sep 1 00:48:25 2021 +0800

    [Python] Handle py::call_method error without mutating internal state (#11840)
    
    Fixes #11823
    
    When the Python logger is customized with underlying `LoggerWrapper` objects, sometimes `async` Python functions may return an incorrect value like `None`. It's because there's a bug (or feature?) of Boost-python that `py::call_method` will fail in C++ object's destructor. See https://github.com/boostorg/python/issues/374 for details.
    
    For the code example in #11823, it's because in `ConsumerImpl`'s destructor, the logger for `AckGroupingTrackerEnabled` will be created again because the logger is thread local and will be created in new threads. In this case, `py::call_method` in `LoggerWrapper#_updateCurrentPythonLogLevel` will fail, and `PyErr_Print` will be called and the error indicator will be cleared, which leads to the result that `async` functions' result became `None`.
    
    - Reduce unnecessary `Logger.getEffectiveLevel` calls to get Python log level , just get the log level when the logger factory is initialized and pass the same level to all loggers.
    - Remove the `PyErr_Print` calls in `LoggerWrapper` related code. In the cases when `py::call_method` failed, use the fallback logger to print logs.
    - Add a dependent test for custom logger test because once the `LoggerFactory` was set all other tests would be affected.
    
    - [x] Make sure that the change passes the CI checks.
    
    This change added test `CustomLoggingTest`. Since `asyncio` module was introduced from Python 3.3 while CI is based on Python 2.7, this test cannot be tested by CI unless Python3 based CI was added.
    
    (cherry picked from commit 9153e7131736decf526684a3bddc4d3f7e307ce3)
---
 pulsar-client-cpp/python/custom_logger_test.py | 54 +++++++++++++++++++
 pulsar-client-cpp/python/pulsar_test.py        |  5 --
 pulsar-client-cpp/python/src/config.cc         | 72 +++++++++++++++-----------
 pulsar-client-cpp/run-unit-tests.sh            |  9 ++++
 4 files changed, 105 insertions(+), 35 deletions(-)

diff --git a/pulsar-client-cpp/python/custom_logger_test.py b/pulsar-client-cpp/python/custom_logger_test.py
new file mode 100644
index 0000000..de0600cc
--- /dev/null
+++ b/pulsar-client-cpp/python/custom_logger_test.py
@@ -0,0 +1,54 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from unittest import TestCase, main
+import asyncio
+import logging
+from pulsar import Client
+
+class CustomLoggingTest(TestCase):
+
+    serviceUrl = 'pulsar://localhost:6650'
+
+    def test_async_func_with_custom_logger(self):
+        # boost::python::call may fail in C++ destructors, even worse, calls
+        # to PyErr_Print could corrupt the Python interpreter.
+        # See https://github.com/boostorg/python/issues/374 for details.
+        # This test is to verify these functions won't be called in C++ destructors
+        # so that Python's async function works well.
+        client = Client(
+            self.serviceUrl,
+            logger=logging.getLogger('custom-logger')
+        )
+
+        async def async_get(value):
+            consumer = client.subscribe('test_async_get', 'sub')
+            consumer.close()
+            return value
+
+        value = 'foo'
+        result = asyncio.run(async_get(value))
+        self.assertEqual(value, result)
+
+        client.close()
+
+if __name__ == '__main__':
+    logging.basicConfig(encoding='utf-8', level=logging.DEBUG)
+    main()
diff --git a/pulsar-client-cpp/python/pulsar_test.py b/pulsar-client-cpp/python/pulsar_test.py
index 4a8ca03..7c85f77 100755
--- a/pulsar-client-cpp/python/pulsar_test.py
+++ b/pulsar-client-cpp/python/pulsar_test.py
@@ -19,7 +19,6 @@
 #
 
 
-import logging
 from unittest import TestCase, main
 import time
 import os
@@ -104,10 +103,6 @@ class PulsarTest(TestCase):
         conf.replicate_subscription_state_enabled(True)
         self.assertEqual(conf.replicate_subscription_state_enabled(), True)
 
-    def test_client_logger(self):
-        logger = logging.getLogger("pulsar")
-        Client(self.serviceUrl, logger=logger)
-        
     def test_connect_error(self):
         with self.assertRaises(pulsar.ConnectError):
             client = Client('fakeServiceUrl')
diff --git a/pulsar-client-cpp/python/src/config.cc b/pulsar-client-cpp/python/src/config.cc
index 0b30713..c34e818 100644
--- a/pulsar-client-cpp/python/src/config.cc
+++ b/pulsar-client-cpp/python/src/config.cc
@@ -18,6 +18,8 @@
  */
 #include "utils.h"
 #include <pulsar/ConsoleLoggerFactory.h>
+#include "lib/Utils.h"
+#include <memory>
 
 template<typename T>
 struct ListenerWrapper {
@@ -90,51 +92,40 @@ static ProducerConfiguration& ProducerConfiguration_setCryptoKeyReader(ProducerC
 }
 
 class LoggerWrapper: public Logger {
-    PyObject* _pyLogger;
-    Logger* fallbackLogger;
-    int _currentPythonLogLevel = _getLogLevelValue(Logger::LEVEL_INFO);
-
-    void _updateCurrentPythonLogLevel() {
-        PyGILState_STATE state = PyGILState_Ensure();
-
-        try {
-            _currentPythonLogLevel = py::call_method<int>(_pyLogger, "getEffectiveLevel");
-        } catch (py::error_already_set e) {
-            PyErr_Print();
-        }
-
-        PyGILState_Release(state);
-    };
+    PyObject* const _pyLogger;
+    const int _pythonLogLevel;
+    const std::unique_ptr<Logger> _fallbackLogger;
 
-    int _getLogLevelValue(Level level) {
+    static constexpr int _getLogLevelValue(Level level) {
         return 10 + (level * 10);
     }
 
    public:
 
-    LoggerWrapper(const std::string &filename, PyObject* pyLogger) {
-        _pyLogger = pyLogger;
+    LoggerWrapper(PyObject* pyLogger, int pythonLogLevel, Logger* fallbackLogger)
+        : _pyLogger(pyLogger),
+          _pythonLogLevel(pythonLogLevel),
+          _fallbackLogger(fallbackLogger) {
         Py_XINCREF(_pyLogger);
-
-        std::unique_ptr<LoggerFactory> factory(new ConsoleLoggerFactory());
-        fallbackLogger = factory->getLogger(filename);
-
-        _updateCurrentPythonLogLevel();
     }
 
+    LoggerWrapper(const LoggerWrapper&) = delete;
+    LoggerWrapper(LoggerWrapper&&) noexcept = delete;
+    LoggerWrapper& operator=(const LoggerWrapper&) = delete;
+    LoggerWrapper& operator=(LoggerWrapper&&) = delete;
+
     virtual ~LoggerWrapper() {
         Py_XDECREF(_pyLogger);
-        delete fallbackLogger;
     }
 
     bool isEnabled(Level level) {
-        return _getLogLevelValue(level) >= _currentPythonLogLevel;
+        return _getLogLevelValue(level) >= _pythonLogLevel;
     }
 
     void log(Level level, int line, const std::string& message) {
-        if (Py_IsInitialized() != true) {
+        if (!Py_IsInitialized()) {
             // Python logger is unavailable - fallback to console logger
-            fallbackLogger->log(level, line, message);
+            _fallbackLogger->log(level, line, message);
         } else {
             PyGILState_STATE state = PyGILState_Ensure();
 
@@ -153,9 +144,8 @@ class LoggerWrapper: public Logger {
                         py::call_method<void>(_pyLogger, "error", message.c_str());
                         break;
                 }
-
             } catch (py::error_already_set e) {
-                PyErr_Print();
+                _fallbackLogger->log(level, line, message);
             }
 
             PyGILState_Release(state);
@@ -164,12 +154,29 @@ class LoggerWrapper: public Logger {
 };
 
 class LoggerWrapperFactory : public LoggerFactory {
+    std::unique_ptr<LoggerFactory> _fallbackLoggerFactory{new ConsoleLoggerFactory};
     PyObject* _pyLogger;
+    Optional<int> _pythonLogLevel{Optional<int>::empty()};
+
+    void initializePythonLogLevel() {
+        PyGILState_STATE state = PyGILState_Ensure();
+
+        try {
+            int level = py::call_method<int>(_pyLogger, "getEffectiveLevel");
+            _pythonLogLevel = Optional<int>::of(level);
+        } catch (const py::error_already_set& e) {
+            // Failed to get log level from _pyLogger, set it to empty to fallback to _fallbackLogger
+            _pythonLogLevel = Optional<int>::empty();
+        }
+
+        PyGILState_Release(state);
+    }
 
    public:
     LoggerWrapperFactory(py::object pyLogger) {
         _pyLogger = pyLogger.ptr();
         Py_XINCREF(_pyLogger);
+        initializePythonLogLevel();
     }
 
     virtual ~LoggerWrapperFactory() {
@@ -177,7 +184,12 @@ class LoggerWrapperFactory : public LoggerFactory {
     }
 
     Logger* getLogger(const std::string &fileName) {
-        return new LoggerWrapper(fileName, _pyLogger);
+        const auto fallbackLogger = _fallbackLoggerFactory->getLogger(fileName);
+        if (_pythonLogLevel.is_present()) {
+            return new LoggerWrapper(_pyLogger, _pythonLogLevel.value(), fallbackLogger);
+        } else {
+            return fallbackLogger;
+        }
     }
 };
 
diff --git a/pulsar-client-cpp/run-unit-tests.sh b/pulsar-client-cpp/run-unit-tests.sh
index 7e40a49..b8d6a8e 100755
--- a/pulsar-client-cpp/run-unit-tests.sh
+++ b/pulsar-client-cpp/run-unit-tests.sh
@@ -73,12 +73,21 @@ if [ $RES -eq 0 ]; then
     cp *_test.py /tmp
     pushd /tmp
 
+    # TODO: this test requires asyncio module that is supported by Python >= 3.3.
+    #  Hoeever, CI doesn't support Python3 yet, we should uncomment following
+    #  lines after Python3 CI script is added.
+    #python custom_logger_test.py
+    #RES=$?
+    #echo "custom_logger_test.py: $RES"
+
     python pulsar_test.py
     RES=$?
+    echo "pulsar_test.py: $RES"
 
     echo "---- Running Python Function Instance unit tests"
     bash $ROOT_DIR/pulsar-functions/instance/src/scripts/run_python_instance_tests.sh
     RES=$?
+    echo "run_python_instance_tests.sh: $RES"
 
     popd
     popd