You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "JonnyWaffles (via GitHub)" <gi...@apache.org> on 2023/03/10 20:31:50 UTC
[GitHub] [airflow] JonnyWaffles opened a new issue, #30029: Dynamically Mapped Task Dag.test causes _Py_CheckRecursiveCall when run as Task
JonnyWaffles opened a new issue, #30029:
URL: https://github.com/apache/airflow/issues/30029
### Apache Airflow version
2.5.1
### What happened
Executor: Celery
Airflow V 2.5.1
Platform: Python 3.9
OS: ubuntu20.04
Hi team, this one is pretty wild.
In order to integrate with our CICD, we run Pytest as a Task inside a Dag like so
```python
@task(multiple_outputs=True)
def run_pytest(params=None) -> Dict[str, Union[int, str]]:
with tempfile.NamedTemporaryFile() as f:
pytest_args = (params or {}).get("pytest_args") or Variable.get(
"pytest_args"
).split()
args = [*pytest_args, "--ignore", __file__, f"--junitxml={f.name}"]
code = pytest.main(args)
xml_text = Path(f.name).read_text()
try:
ET.fromstring(xml_text)
except ET.ParseError as err:
xml_text = f"{JUNIT_XML_EXCEPTION_STRING} " f"Original exception: {err}"
return {PYTEST_RET_CODE_XCOM_KEY: code, JUNIT_XML_XCOM_KEY: xml_text}
```
This has been working great! And I was excited to use the new `dag.test` command to run our test dags inside the pipeline. Unfortunately, if you `dag.test` any Dag with dynamically mapped tasks, the logging system triggers a recursive exception.
You can easily recreate this behavior by creating the following test case
```python
from airflow.example_dags.example_dynamic_task_mapping import dag
def test_map_dag():
dag.test()
```
And then executing it as a task using the above `run_pytest`. You'll see a stack trace like
```
File "/app/.local/lib/python3.9/site-packages/airflow/utils/log/logging_mixin.py", line 149, in write
2023-03-10T20:18:54.316081052Z self.flush()
2023-03-10T20:18:54.316083250Z File "/app/.local/lib/python3.9/site-packages/airflow/utils/log/logging_mixin.py", line 156, in flush
2023-03-10T20:18:54.316085497Z self._propagate_log(buf)
2023-03-10T20:18:54.316088048Z File "/app/.local/lib/python3.9/site-packages/airflow/utils/log/logging_mixin.py", line 137, in _propagate_log
2023-03-10T20:18:54.316090370Z self.logger.log(self.level, remove_escape_codes(message))
2023-03-10T20:18:54.316092740Z File "/usr/lib/python3.9/logging/__init__.py", line 1512, in log
2023-03-10T20:18:54.316095067Z self._log(level, msg, args, **kwargs)
2023-03-10T20:18:54.316097285Z File "/usr/lib/python3.9/logging/__init__.py", line 1589, in _log
2023-03-10T20:18:54.316102712Z self.handle(record)
2023-03-10T20:18:54.316105214Z File "/usr/lib/python3.9/logging/__init__.py", line 1599, in handle
2023-03-10T20:18:54.316107414Z self.callHandlers(record)
2023-03-10T20:18:54.316109533Z File "/usr/lib/python3.9/logging/__init__.py", line 1661, in callHandlers
2023-03-10T20:18:54.316111769Z hdlr.handle(record)
2023-03-10T20:18:54.316113875Z File "/usr/lib/python3.9/logging/__init__.py", line 952, in handle
2023-03-10T20:18:54.316116060Z self.emit(record)
2023-03-10T20:18:54.316118210Z File "/usr/lib/python3.9/logging/__init__.py", line 1091, in emit
2023-03-10T20:18:54.316120464Z self.handleError(record)
2023-03-10T20:18:54.316122649Z File "/usr/lib/python3.9/logging/__init__.py", line 1004, in handleError
2023-03-10T20:18:54.316124890Z sys.stderr.write('--- Logging error ---\n')
2023-03-10T20:18:54.316127043Z File "/app/.local/lib/python3.9/site-packages/airflow/utils/log/logging_mixin.py", line 149, in write
2023-03-10T20:18:54.316129277Z self.flush()
2023-03-10T20:18:54.316131450Z File "/app/.local/lib/python3.9/site-packages/airflow/utils/log/logging_mixin.py", line 156, in flush
2023-03-10T20:18:54.316133804Z self._propagate_log(buf)
2023-03-10T20:18:54.316135984Z File "/app/.local/lib/python3.9/site-packages/airflow/utils/log/logging_mixin.py", line 137, in _propagate_log
```
etc etc. You can see the cycle between `flush` and `propagate_log`
### What you think should happen instead
It would be nice to be able to run a Dag.test command inside a PythonOperator Task so we can easily run pytest as a remotely triggered Dag.
### How to reproduce
1. Create a test case
```python
from airflow.example_dags.example_dynamic_task_mapping import dag
def test_map_dag():
dag.test()
```
2. Run the test case via Pytest inside a PythonOperator Task (using CeleryExecutor)
### Operating System
ubuntu20.04
### Versions of Apache Airflow Providers
You don't need any providers to reproduce
### Deployment
Docker-Compose
### Deployment details
Running scheduler, worker, webserver, and Redis, are inside a local docker compose. Single worker container with default Celery settings.
### Anything else
_No response_
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] potiuk closed issue #30029: Dag.test causes Logging Recurison when run as a Python Task (Celery)
Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk closed issue #30029: Dag.test causes Logging Recurison when run as a Python Task (Celery)
URL: https://github.com/apache/airflow/issues/30029
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] JonnyWaffles commented on issue #30029: Dynamically Mapped Task Dag.test causes _Py_CheckRecursiveCall when run as Task
Posted by "JonnyWaffles (via GitHub)" <gi...@apache.org>.
JonnyWaffles commented on issue #30029:
URL: https://github.com/apache/airflow/issues/30029#issuecomment-1464457619
I was wrong, non-dynamic dags also break. You can try the same
```python
def test_nondynamic_dag():
from airflow.example_dags.tutorial_dag import dag
dag.test()
```
Also triggers the infinite logging.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] potiuk commented on issue #30029: Dag.test causes Logging Recurison when run as a Python Task (Celery)
Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on issue #30029:
URL: https://github.com/apache/airflow/issues/30029#issuecomment-1468848076
Yes. This is how currently logging works in released version - it takes the stdout and redirect it to logger inside task. If there is another redirection done with pytest done in the same process (whit ch pytest does by default) it will recurse because stdout that it gets is already a logger effectively. What you can try to do, is to run pytest as subprocess or disable any log redirection by pytest (you should look up in pytest options and try what works).
Also we are doing some improvements to upcoming 2.6 version and it might change the behaviour - it would be best if you try the main version and see if it behaves better.
For now I am converting it to to a discussion in case more is needed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org