You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xy...@apache.org on 2021/08/31 16:49:11 UTC

[pulsar] branch master updated: [Python] Handle py::call_method error without mutating internal state (#11840)

This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 9153e71  [Python] Handle py::call_method error without mutating internal state (#11840)
9153e71 is described below

commit 9153e7131736decf526684a3bddc4d3f7e307ce3
Author: Yunze Xu <xy...@163.com>
AuthorDate: Wed Sep 1 00:48:25 2021 +0800

    [Python] Handle py::call_method error without mutating internal state (#11840)
    
    Fixes #11823
    
    ### Motivation
    
    When the Python logger is customized with underlying `LoggerWrapper` objects, sometimes `async` Python functions may return an incorrect value like `None`. It's because there's a bug (or feature?) of Boost-python that `py::call_method` will fail in C++ object's destructor. See https://github.com/boostorg/python/issues/374 for details.
    
    For the code example in #11823, it's because in `ConsumerImpl`'s destructor, the logger for `AckGroupingTrackerEnabled` will be created again because the logger is thread local and will be created in new threads. In this case, `py::call_method` in `LoggerWrapper#_updateCurrentPythonLogLevel` will fail, and `PyErr_Print` will be called and the error indicator will be cleared, which leads to the result that `async` functions' result became `None`.
    
    ### Modifications
    
    - Reduce unnecessary `Logger.getEffectiveLevel` calls to get Python log level , just get the log level when the logger factory is initialized and pass the same level to all loggers.
    - Remove the `PyErr_Print` calls in `LoggerWrapper` related code. In the cases when `py::call_method` failed, use the fallback logger to print logs.
    - Add a dependent test for custom logger test because once the `LoggerFactory` was set all other tests would be affected.
    
    ### Verifying this change
    
    - [x] Make sure that the change passes the CI checks.
    
    This change added test `CustomLoggingTest`. Since `asyncio` module was introduced from Python 3.3 while CI is based on Python 2.7, this test cannot be tested by CI unless Python3 based CI was added.
---
 pulsar-client-cpp/python/custom_logger_test.py | 54 ++++++++++++++++++++
 pulsar-client-cpp/python/pulsar_test.py        |  5 --
 pulsar-client-cpp/python/src/config.cc         | 71 +++++++++++++++-----------
 pulsar-client-cpp/run-unit-tests.sh            |  9 ++++
 4 files changed, 105 insertions(+), 34 deletions(-)

diff --git a/pulsar-client-cpp/python/custom_logger_test.py b/pulsar-client-cpp/python/custom_logger_test.py
new file mode 100644
index 0000000..de0600cc
--- /dev/null
+++ b/pulsar-client-cpp/python/custom_logger_test.py
@@ -0,0 +1,54 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from unittest import TestCase, main
+import asyncio
+import logging
+from pulsar import Client
+
+class CustomLoggingTest(TestCase):
+
+    serviceUrl = 'pulsar://localhost:6650'
+
+    def test_async_func_with_custom_logger(self):
+        # boost::python::call may fail in C++ destructors, even worse, calls
+        # to PyErr_Print could corrupt the Python interpreter.
+        # See https://github.com/boostorg/python/issues/374 for details.
+        # This test is to verify these functions won't be called in C++ destructors
+        # so that Python's async function works well.
+        client = Client(
+            self.serviceUrl,
+            logger=logging.getLogger('custom-logger')
+        )
+
+        async def async_get(value):
+            consumer = client.subscribe('test_async_get', 'sub')
+            consumer.close()
+            return value
+
+        value = 'foo'
+        result = asyncio.run(async_get(value))
+        self.assertEqual(value, result)
+
+        client.close()
+
+if __name__ == '__main__':
+    logging.basicConfig(encoding='utf-8', level=logging.DEBUG)
+    main()
diff --git a/pulsar-client-cpp/python/pulsar_test.py b/pulsar-client-cpp/python/pulsar_test.py
index c96bb9d..3d5016f 100755
--- a/pulsar-client-cpp/python/pulsar_test.py
+++ b/pulsar-client-cpp/python/pulsar_test.py
@@ -19,7 +19,6 @@
 #
 
 
-import logging
 from unittest import TestCase, main
 import time
 import os
@@ -104,10 +103,6 @@ class PulsarTest(TestCase):
         conf.replicate_subscription_state_enabled(True)
         self.assertEqual(conf.replicate_subscription_state_enabled(), True)
 
-    def test_client_logger(self):
-        logger = logging.getLogger("pulsar")
-        Client(self.serviceUrl, logger=logger)
-
     def test_connect_error(self):
         with self.assertRaises(pulsar.ConnectError):
             client = Client('fakeServiceUrl')
diff --git a/pulsar-client-cpp/python/src/config.cc b/pulsar-client-cpp/python/src/config.cc
index 637e793..a287248 100644
--- a/pulsar-client-cpp/python/src/config.cc
+++ b/pulsar-client-cpp/python/src/config.cc
@@ -18,6 +18,8 @@
  */
 #include "utils.h"
 #include <pulsar/ConsoleLoggerFactory.h>
+#include "lib/Utils.h"
+#include <memory>
 
 template<typename T>
 struct ListenerWrapper {
@@ -97,51 +99,40 @@ static ReaderConfiguration& ReaderConfiguration_setCryptoKeyReader(ReaderConfigu
 }
 
 class LoggerWrapper: public Logger {
-    PyObject* _pyLogger;
-    Logger* fallbackLogger;
-    int _currentPythonLogLevel = _getLogLevelValue(Logger::LEVEL_INFO);
-
-    void _updateCurrentPythonLogLevel() {
-        PyGILState_STATE state = PyGILState_Ensure();
-
-        try {
-            _currentPythonLogLevel = py::call_method<int>(_pyLogger, "getEffectiveLevel");
-        } catch (const py::error_already_set& e) {
-            PyErr_Print();
-        }
-
-        PyGILState_Release(state);
-    };
+    PyObject* const _pyLogger;
+    const int _pythonLogLevel;
+    const std::unique_ptr<Logger> _fallbackLogger;
 
-    int _getLogLevelValue(Level level) {
+    static constexpr int _getLogLevelValue(Level level) {
         return 10 + (level * 10);
     }
 
    public:
 
-    LoggerWrapper(const std::string &filename, PyObject* pyLogger) {
-        _pyLogger = pyLogger;
+    LoggerWrapper(PyObject* pyLogger, int pythonLogLevel, Logger* fallbackLogger)
+        : _pyLogger(pyLogger),
+          _pythonLogLevel(pythonLogLevel),
+          _fallbackLogger(fallbackLogger) {
         Py_XINCREF(_pyLogger);
-
-        std::unique_ptr<LoggerFactory> factory(new ConsoleLoggerFactory());
-        fallbackLogger = factory->getLogger(filename);
-
-        _updateCurrentPythonLogLevel();
     }
 
+    LoggerWrapper(const LoggerWrapper&) = delete;
+    LoggerWrapper(LoggerWrapper&&) noexcept = delete;
+    LoggerWrapper& operator=(const LoggerWrapper&) = delete;
+    LoggerWrapper& operator=(LoggerWrapper&&) = delete;
+
     virtual ~LoggerWrapper() {
         Py_XDECREF(_pyLogger);
-        delete fallbackLogger;
     }
 
     bool isEnabled(Level level) {
-        return _getLogLevelValue(level) >= _currentPythonLogLevel;
+        return _getLogLevelValue(level) >= _pythonLogLevel;
     }
 
     void log(Level level, int line, const std::string& message) {
-        if (Py_IsInitialized() != true) {
+        if (!Py_IsInitialized()) {
             // Python logger is unavailable - fallback to console logger
-            fallbackLogger->log(level, line, message);
+            _fallbackLogger->log(level, line, message);
         } else {
             PyGILState_STATE state = PyGILState_Ensure();
 
@@ -162,7 +153,7 @@ class LoggerWrapper: public Logger {
                 }
 
             } catch (const py::error_already_set& e) {
-                PyErr_Print();
+                _fallbackLogger->log(level, line, message);
             }
 
             PyGILState_Release(state);
@@ -171,12 +162,29 @@ class LoggerWrapper: public Logger {
 };
 
 class LoggerWrapperFactory : public LoggerFactory {
+    std::unique_ptr<LoggerFactory> _fallbackLoggerFactory{new ConsoleLoggerFactory};
     PyObject* _pyLogger;
+    Optional<int> _pythonLogLevel{Optional<int>::empty()};
+
+    void initializePythonLogLevel() {
+        PyGILState_STATE state = PyGILState_Ensure();
+
+        try {
+            int level = py::call_method<int>(_pyLogger, "getEffectiveLevel");
+            _pythonLogLevel = Optional<int>::of(level);
+        } catch (const py::error_already_set& e) {
+            // Failed to get log level from _pyLogger, set it to empty to fallback to _fallbackLogger
+            _pythonLogLevel = Optional<int>::empty();
+        }
+
+        PyGILState_Release(state);
+    }
 
    public:
     LoggerWrapperFactory(py::object pyLogger) {
         _pyLogger = pyLogger.ptr();
         Py_XINCREF(_pyLogger);
+        initializePythonLogLevel();
     }
 
     virtual ~LoggerWrapperFactory() {
@@ -184,7 +192,12 @@ class LoggerWrapperFactory : public LoggerFactory {
     }
 
     Logger* getLogger(const std::string &fileName) {
-        return new LoggerWrapper(fileName, _pyLogger);
+        const auto fallbackLogger = _fallbackLoggerFactory->getLogger(fileName);
+        if (_pythonLogLevel.is_present()) {
+            return new LoggerWrapper(_pyLogger, _pythonLogLevel.value(), fallbackLogger);
+        } else {
+            return fallbackLogger;
+        }
     }
 };
 
diff --git a/pulsar-client-cpp/run-unit-tests.sh b/pulsar-client-cpp/run-unit-tests.sh
index 7e40a49..b8d6a8e 100755
--- a/pulsar-client-cpp/run-unit-tests.sh
+++ b/pulsar-client-cpp/run-unit-tests.sh
@@ -73,12 +73,21 @@ if [ $RES -eq 0 ]; then
     cp *_test.py /tmp
     pushd /tmp
 
+    # TODO: this test requires asyncio module that is supported by Python >= 3.3.
+    #  Hoeever, CI doesn't support Python3 yet, we should uncomment following
+    #  lines after Python3 CI script is added.
+    #python custom_logger_test.py
+    #RES=$?
+    #echo "custom_logger_test.py: $RES"
+
     python pulsar_test.py
     RES=$?
+    echo "pulsar_test.py: $RES"
 
     echo "---- Running Python Function Instance unit tests"
     bash $ROOT_DIR/pulsar-functions/instance/src/scripts/run_python_instance_tests.sh
     RES=$?
+    echo "run_python_instance_tests.sh: $RES"
 
     popd
     popd