You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/04/29 10:43:50 UTC

[GitHub] [airflow] michaelosthege commented on issue #11882: can't clear task SubDagOperator with executor=KubernetesExecutor()

michaelosthege commented on issue #11882:
URL: https://github.com/apache/airflow/issues/11882#issuecomment-1113163483

   We're seeing this with Airflow 2.2.5 on Ubuntu 22.04 running with the CeleryExecutor.
   Also [this StackOverflow post ](https://stackoverflow.com/questions/67735179/airflow-2-0-celery-dask) reported it for Airflow >2.0 with Celery.
   
   For us it happens in a Celery worker that's trying to run a task involving `pygmo` (https://esa.github.io/pygmo2/) which uses a lot of multiprocessing.
   
   Our worker has `PYTHONOPTIMIZE=1` and `OMP_NUM_THREADS=1` set but otherwise it's pretty standard I would say.
   
   Relevant part of the log is here:
   
   <details><summary>Worker log</summary>
   
   ```
   [2022-04-28 10:49:58,097] {logging_mixin.py:109} INFO - Creating archipelago with 4 islands. May take some time...
   [2022-04-28 10:49:58,098] {warnings.py:109} WARNING - /opt/conda/lib/python3.9/site-packages/joblib/parallel.py:735: UserWarning: Loky-backed parallel loops cannot be called in a multiprocessing, setting n_jobs=1
     n_jobs = self._backend.configure(n_jobs=self.n_jobs, parallel=self,
   [2022-04-28 10:49:58,098] {logging_mixin.py:109} WARNING - [Parallel(n_jobs=4)]: Using backend SequentialBackend with 1 concurrent workers.
   [2022-04-28 10:51:35,329] {logging_mixin.py:109} WARNING - [Parallel(n_jobs=4)]: Done   4 out of   4 | elapsed:  1.6min finished
   [2022-04-28 10:51:35,348] {taskinstance.py:1774} ERROR - Task failed with exception
   Traceback (most recent call last):
     File "/opt/conda/lib/python3.9/site-packages/airflow/operators/python.py", line 174, in execute
       return_value = self.execute_callable()
     File "/opt/conda/lib/python3.9/site-packages/airflow/operators/python.py", line 188, in execute_callable
       return self.python_callable(*self.op_args, **self.op_kwargs)
     File "/usr/local/airflow/dags/flow.py", line 95, in _run
       result = self.callable(context=context, **kwargs)
     File "/usr/local/airflow/dags/flow.py", line 43, in wrapper
       return func(*args, **kwargs)
     File "/usr/local/airflow/dags/dag_pyFOOMB_ParamEst.py", line 48, in pipeline_call
       result = fun(wd=data, **callable_kwargs)
     File "/repos/pyfoomb_paramest/pipeline.py", line 155, in estimate_parameters
       estimates, est_info = caretaker.estimate_parallel(
     File "/opt/conda/lib/python3.9/site-packages/pyfoomb/caretaker.py", line 634, in estimate_parallel
       archipelago = ArchipelagoHelpers.create_archipelago(
     File "/opt/conda/lib/python3.9/site-packages/pyfoomb/generalized_islands.py", line 522, in create_archipelago
       _island = pygmo.island(algo=_algo, pop=_pop, udi=pygmo.mp_island())
     File "/opt/conda/lib/python3.9/site-packages/pygmo/_py_islands.py", line 119, in __init__
       self._init(use_pool)
     File "/opt/conda/lib/python3.9/site-packages/pygmo/_py_islands.py", line 130, in _init
       mp_island.init_pool()
     File "/opt/conda/lib/python3.9/site-packages/pygmo/_py_islands.py", line 338, in init_pool
       mp_island._init_pool_impl(processes)
     File "/opt/conda/lib/python3.9/site-packages/pygmo/_py_islands.py", line 315, in _init_pool_impl
       mp_island._pool, mp_island._pool_size = _make_pool(processes)
     File "/opt/conda/lib/python3.9/site-packages/pygmo/_mp_utils.py", line 62, in _make_pool
       pool = mp_ctx.Pool(processes=processes)
     File "/opt/conda/lib/python3.9/multiprocessing/context.py", line 119, in Pool
       return Pool(processes, initializer, initargs, maxtasksperchild,
     File "/opt/conda/lib/python3.9/multiprocessing/pool.py", line 212, in __init__
       self._repopulate_pool()
     File "/opt/conda/lib/python3.9/multiprocessing/pool.py", line 303, in _repopulate_pool
       return self._repopulate_pool_static(self._ctx, self.Process,
     File "/opt/conda/lib/python3.9/multiprocessing/pool.py", line 326, in _repopulate_pool_static
       w.start()
     File "/opt/conda/lib/python3.9/multiprocessing/process.py", line 121, in start
       self._popen = self._Popen(self)
     File "/opt/conda/lib/python3.9/multiprocessing/context.py", line 284, in _Popen
       return Popen(process_obj)
     File "/opt/conda/lib/python3.9/multiprocessing/popen_spawn_posix.py", line 32, in __init__
       super().__init__(process_obj)
     File "/opt/conda/lib/python3.9/multiprocessing/popen_fork.py", line 19, in __init__
       self._launch(process_obj)
     File "/opt/conda/lib/python3.9/multiprocessing/popen_spawn_posix.py", line 46, in _launch
       reduction.dump(prep_data, fp)
     File "/opt/conda/lib/python3.9/multiprocessing/reduction.py", line 60, in dump
       ForkingPickler(file, protocol).dump(obj)
     File "/opt/conda/lib/python3.9/site-packages/billiard/process.py", line 365, in __reduce__
       raise TypeError(
   TypeError: Pickling an AuthenticationString object is disallowed for security reasons
   ```
   
   </details>
   
   Any ideas?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org