You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/04/29 10:43:50 UTC
[GitHub] [airflow] michaelosthege commented on issue #11882: can't clear task SubDagOperator with executor=KubernetesExecutor()
michaelosthege commented on issue #11882:
URL: https://github.com/apache/airflow/issues/11882#issuecomment-1113163483
We're seeing this with Airflow 2.2.5 on Ubuntu 22.04 running with the CeleryExecutor.
Also [this StackOverflow post ](https://stackoverflow.com/questions/67735179/airflow-2-0-celery-dask) reported it for Airflow >2.0 with Celery.
For us it happens in a Celery worker that's trying to run a task involving `pygmo` (https://esa.github.io/pygmo2/) which uses a lot of multiprocessing.
Our worker has `PYTHONOPTIMIZE=1` and `OMP_NUM_THREADS=1` set but otherwise it's pretty standard I would say.
Relevant part of the log is here:
<details><summary>Worker log</summary>
```
[2022-04-28 10:49:58,097] {logging_mixin.py:109} INFO - Creating archipelago with 4 islands. May take some time...
[2022-04-28 10:49:58,098] {warnings.py:109} WARNING - /opt/conda/lib/python3.9/site-packages/joblib/parallel.py:735: UserWarning: Loky-backed parallel loops cannot be called in a multiprocessing, setting n_jobs=1
n_jobs = self._backend.configure(n_jobs=self.n_jobs, parallel=self,
[2022-04-28 10:49:58,098] {logging_mixin.py:109} WARNING - [Parallel(n_jobs=4)]: Using backend SequentialBackend with 1 concurrent workers.
[2022-04-28 10:51:35,329] {logging_mixin.py:109} WARNING - [Parallel(n_jobs=4)]: Done 4 out of 4 | elapsed: 1.6min finished
[2022-04-28 10:51:35,348] {taskinstance.py:1774} ERROR - Task failed with exception
Traceback (most recent call last):
File "/opt/conda/lib/python3.9/site-packages/airflow/operators/python.py", line 174, in execute
return_value = self.execute_callable()
File "/opt/conda/lib/python3.9/site-packages/airflow/operators/python.py", line 188, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/usr/local/airflow/dags/flow.py", line 95, in _run
result = self.callable(context=context, **kwargs)
File "/usr/local/airflow/dags/flow.py", line 43, in wrapper
return func(*args, **kwargs)
File "/usr/local/airflow/dags/dag_pyFOOMB_ParamEst.py", line 48, in pipeline_call
result = fun(wd=data, **callable_kwargs)
File "/repos/pyfoomb_paramest/pipeline.py", line 155, in estimate_parameters
estimates, est_info = caretaker.estimate_parallel(
File "/opt/conda/lib/python3.9/site-packages/pyfoomb/caretaker.py", line 634, in estimate_parallel
archipelago = ArchipelagoHelpers.create_archipelago(
File "/opt/conda/lib/python3.9/site-packages/pyfoomb/generalized_islands.py", line 522, in create_archipelago
_island = pygmo.island(algo=_algo, pop=_pop, udi=pygmo.mp_island())
File "/opt/conda/lib/python3.9/site-packages/pygmo/_py_islands.py", line 119, in __init__
self._init(use_pool)
File "/opt/conda/lib/python3.9/site-packages/pygmo/_py_islands.py", line 130, in _init
mp_island.init_pool()
File "/opt/conda/lib/python3.9/site-packages/pygmo/_py_islands.py", line 338, in init_pool
mp_island._init_pool_impl(processes)
File "/opt/conda/lib/python3.9/site-packages/pygmo/_py_islands.py", line 315, in _init_pool_impl
mp_island._pool, mp_island._pool_size = _make_pool(processes)
File "/opt/conda/lib/python3.9/site-packages/pygmo/_mp_utils.py", line 62, in _make_pool
pool = mp_ctx.Pool(processes=processes)
File "/opt/conda/lib/python3.9/multiprocessing/context.py", line 119, in Pool
return Pool(processes, initializer, initargs, maxtasksperchild,
File "/opt/conda/lib/python3.9/multiprocessing/pool.py", line 212, in __init__
self._repopulate_pool()
File "/opt/conda/lib/python3.9/multiprocessing/pool.py", line 303, in _repopulate_pool
return self._repopulate_pool_static(self._ctx, self.Process,
File "/opt/conda/lib/python3.9/multiprocessing/pool.py", line 326, in _repopulate_pool_static
w.start()
File "/opt/conda/lib/python3.9/multiprocessing/process.py", line 121, in start
self._popen = self._Popen(self)
File "/opt/conda/lib/python3.9/multiprocessing/context.py", line 284, in _Popen
return Popen(process_obj)
File "/opt/conda/lib/python3.9/multiprocessing/popen_spawn_posix.py", line 32, in __init__
super().__init__(process_obj)
File "/opt/conda/lib/python3.9/multiprocessing/popen_fork.py", line 19, in __init__
self._launch(process_obj)
File "/opt/conda/lib/python3.9/multiprocessing/popen_spawn_posix.py", line 46, in _launch
reduction.dump(prep_data, fp)
File "/opt/conda/lib/python3.9/multiprocessing/reduction.py", line 60, in dump
ForkingPickler(file, protocol).dump(obj)
File "/opt/conda/lib/python3.9/site-packages/billiard/process.py", line 365, in __reduce__
raise TypeError(
TypeError: Pickling an AuthenticationString object is disallowed for security reasons
```
</details>
Any ideas?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org