You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xy...@apache.org on 2021/08/31 16:49:11 UTC
[pulsar] branch master updated: [Python] Handle py::call_method
error without mutating internal state (#11840)
This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 9153e71 [Python] Handle py::call_method error without mutating internal state (#11840)
9153e71 is described below
commit 9153e7131736decf526684a3bddc4d3f7e307ce3
Author: Yunze Xu <xy...@163.com>
AuthorDate: Wed Sep 1 00:48:25 2021 +0800
[Python] Handle py::call_method error without mutating internal state (#11840)
Fixes #11823
### Motivation
When the Python logger is customized with underlying `LoggerWrapper` objects, sometimes `async` Python functions may return an incorrect value like `None`. It's because there's a bug (or feature?) of Boost-python that `py::call_method` will fail in C++ object's destructor. See https://github.com/boostorg/python/issues/374 for details.
For the code example in #11823, it's because in `ConsumerImpl`'s destructor, the logger for `AckGroupingTrackerEnabled` will be created again because the logger is thread local and will be created in new threads. In this case, `py::call_method` in `LoggerWrapper#_updateCurrentPythonLogLevel` will fail, and `PyErr_Print` will be called and the error indicator will be cleared, which leads to the result that `async` functions' result became `None`.
### Modifications
- Reduce unnecessary `Logger.getEffectiveLevel` calls to get Python log level , just get the log level when the logger factory is initialized and pass the same level to all loggers.
- Remove the `PyErr_Print` calls in `LoggerWrapper` related code. In the cases when `py::call_method` failed, use the fallback logger to print logs.
- Add a dependent test for custom logger test because once the `LoggerFactory` was set all other tests would be affected.
### Verifying this change
- [x] Make sure that the change passes the CI checks.
This change added test `CustomLoggingTest`. Since `asyncio` module was introduced from Python 3.3 while CI is based on Python 2.7, this test cannot be tested by CI unless Python3 based CI was added.
---
pulsar-client-cpp/python/custom_logger_test.py | 54 ++++++++++++++++++++
pulsar-client-cpp/python/pulsar_test.py | 5 --
pulsar-client-cpp/python/src/config.cc | 71 +++++++++++++++-----------
pulsar-client-cpp/run-unit-tests.sh | 9 ++++
4 files changed, 105 insertions(+), 34 deletions(-)
diff --git a/pulsar-client-cpp/python/custom_logger_test.py b/pulsar-client-cpp/python/custom_logger_test.py
new file mode 100644
index 0000000..de0600cc
--- /dev/null
+++ b/pulsar-client-cpp/python/custom_logger_test.py
@@ -0,0 +1,54 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from unittest import TestCase, main
+import asyncio
+import logging
+from pulsar import Client
+
+class CustomLoggingTest(TestCase):
+
+ serviceUrl = 'pulsar://localhost:6650'
+
+ def test_async_func_with_custom_logger(self):
+ # boost::python::call may fail in C++ destructors, even worse, calls
+ # to PyErr_Print could corrupt the Python interpreter.
+ # See https://github.com/boostorg/python/issues/374 for details.
+ # This test is to verify these functions won't be called in C++ destructors
+ # so that Python's async function works well.
+ client = Client(
+ self.serviceUrl,
+ logger=logging.getLogger('custom-logger')
+ )
+
+ async def async_get(value):
+ consumer = client.subscribe('test_async_get', 'sub')
+ consumer.close()
+ return value
+
+ value = 'foo'
+ result = asyncio.run(async_get(value))
+ self.assertEqual(value, result)
+
+ client.close()
+
+if __name__ == '__main__':
+ logging.basicConfig(encoding='utf-8', level=logging.DEBUG)
+ main()
diff --git a/pulsar-client-cpp/python/pulsar_test.py b/pulsar-client-cpp/python/pulsar_test.py
index c96bb9d..3d5016f 100755
--- a/pulsar-client-cpp/python/pulsar_test.py
+++ b/pulsar-client-cpp/python/pulsar_test.py
@@ -19,7 +19,6 @@
#
-import logging
from unittest import TestCase, main
import time
import os
@@ -104,10 +103,6 @@ class PulsarTest(TestCase):
conf.replicate_subscription_state_enabled(True)
self.assertEqual(conf.replicate_subscription_state_enabled(), True)
- def test_client_logger(self):
- logger = logging.getLogger("pulsar")
- Client(self.serviceUrl, logger=logger)
-
def test_connect_error(self):
with self.assertRaises(pulsar.ConnectError):
client = Client('fakeServiceUrl')
diff --git a/pulsar-client-cpp/python/src/config.cc b/pulsar-client-cpp/python/src/config.cc
index 637e793..a287248 100644
--- a/pulsar-client-cpp/python/src/config.cc
+++ b/pulsar-client-cpp/python/src/config.cc
@@ -18,6 +18,8 @@
*/
#include "utils.h"
#include <pulsar/ConsoleLoggerFactory.h>
+#include "lib/Utils.h"
+#include <memory>
template<typename T>
struct ListenerWrapper {
@@ -97,51 +99,40 @@ static ReaderConfiguration& ReaderConfiguration_setCryptoKeyReader(ReaderConfigu
}
class LoggerWrapper: public Logger {
- PyObject* _pyLogger;
- Logger* fallbackLogger;
- int _currentPythonLogLevel = _getLogLevelValue(Logger::LEVEL_INFO);
-
- void _updateCurrentPythonLogLevel() {
- PyGILState_STATE state = PyGILState_Ensure();
-
- try {
- _currentPythonLogLevel = py::call_method<int>(_pyLogger, "getEffectiveLevel");
- } catch (const py::error_already_set& e) {
- PyErr_Print();
- }
-
- PyGILState_Release(state);
- };
+ PyObject* const _pyLogger;
+ const int _pythonLogLevel;
+ const std::unique_ptr<Logger> _fallbackLogger;
- int _getLogLevelValue(Level level) {
+ static constexpr int _getLogLevelValue(Level level) {
return 10 + (level * 10);
}
public:
- LoggerWrapper(const std::string &filename, PyObject* pyLogger) {
- _pyLogger = pyLogger;
+ LoggerWrapper(PyObject* pyLogger, int pythonLogLevel, Logger* fallbackLogger)
+ : _pyLogger(pyLogger),
+ _pythonLogLevel(pythonLogLevel),
+ _fallbackLogger(fallbackLogger) {
Py_XINCREF(_pyLogger);
-
- std::unique_ptr<LoggerFactory> factory(new ConsoleLoggerFactory());
- fallbackLogger = factory->getLogger(filename);
-
- _updateCurrentPythonLogLevel();
}
+ LoggerWrapper(const LoggerWrapper&) = delete;
+ LoggerWrapper(LoggerWrapper&&) noexcept = delete;
+ LoggerWrapper& operator=(const LoggerWrapper&) = delete;
+ LoggerWrapper& operator=(LoggerWrapper&&) = delete;
+
virtual ~LoggerWrapper() {
Py_XDECREF(_pyLogger);
- delete fallbackLogger;
}
bool isEnabled(Level level) {
- return _getLogLevelValue(level) >= _currentPythonLogLevel;
+ return _getLogLevelValue(level) >= _pythonLogLevel;
}
void log(Level level, int line, const std::string& message) {
- if (Py_IsInitialized() != true) {
+ if (!Py_IsInitialized()) {
// Python logger is unavailable - fallback to console logger
- fallbackLogger->log(level, line, message);
+ _fallbackLogger->log(level, line, message);
} else {
PyGILState_STATE state = PyGILState_Ensure();
@@ -162,7 +153,7 @@ class LoggerWrapper: public Logger {
}
} catch (const py::error_already_set& e) {
- PyErr_Print();
+ _fallbackLogger->log(level, line, message);
}
PyGILState_Release(state);
@@ -171,12 +162,29 @@ class LoggerWrapper: public Logger {
};
class LoggerWrapperFactory : public LoggerFactory {
+ std::unique_ptr<LoggerFactory> _fallbackLoggerFactory{new ConsoleLoggerFactory};
PyObject* _pyLogger;
+ Optional<int> _pythonLogLevel{Optional<int>::empty()};
+
+ void initializePythonLogLevel() {
+ PyGILState_STATE state = PyGILState_Ensure();
+
+ try {
+ int level = py::call_method<int>(_pyLogger, "getEffectiveLevel");
+ _pythonLogLevel = Optional<int>::of(level);
+ } catch (const py::error_already_set& e) {
+ // Failed to get log level from _pyLogger, set it to empty to fallback to _fallbackLogger
+ _pythonLogLevel = Optional<int>::empty();
+ }
+
+ PyGILState_Release(state);
+ }
public:
LoggerWrapperFactory(py::object pyLogger) {
_pyLogger = pyLogger.ptr();
Py_XINCREF(_pyLogger);
+ initializePythonLogLevel();
}
virtual ~LoggerWrapperFactory() {
@@ -184,7 +192,12 @@ class LoggerWrapperFactory : public LoggerFactory {
}
Logger* getLogger(const std::string &fileName) {
- return new LoggerWrapper(fileName, _pyLogger);
+ const auto fallbackLogger = _fallbackLoggerFactory->getLogger(fileName);
+ if (_pythonLogLevel.is_present()) {
+ return new LoggerWrapper(_pyLogger, _pythonLogLevel.value(), fallbackLogger);
+ } else {
+ return fallbackLogger;
+ }
}
};
diff --git a/pulsar-client-cpp/run-unit-tests.sh b/pulsar-client-cpp/run-unit-tests.sh
index 7e40a49..b8d6a8e 100755
--- a/pulsar-client-cpp/run-unit-tests.sh
+++ b/pulsar-client-cpp/run-unit-tests.sh
@@ -73,12 +73,21 @@ if [ $RES -eq 0 ]; then
cp *_test.py /tmp
pushd /tmp
+ # TODO: this test requires asyncio module that is supported by Python >= 3.3.
+ # Hoeever, CI doesn't support Python3 yet, we should uncomment following
+ # lines after Python3 CI script is added.
+ #python custom_logger_test.py
+ #RES=$?
+ #echo "custom_logger_test.py: $RES"
+
python pulsar_test.py
RES=$?
+ echo "pulsar_test.py: $RES"
echo "---- Running Python Function Instance unit tests"
bash $ROOT_DIR/pulsar-functions/instance/src/scripts/run_python_instance_tests.sh
RES=$?
+ echo "run_python_instance_tests.sh: $RES"
popd
popd