You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/01/07 07:05:25 UTC

[GitHub] [airflow] xawyong commented on issue #11451: AttributeError: Can't pickle local object 'DaskExecutor.execute_async..airflow_run'

xawyong commented on issue #11451:
URL: https://github.com/apache/airflow/issues/11451#issuecomment-755929744


   use  2.0.0, after changed the code as @lafrinte proposed, got below error.
   
   ```
     ____________       _____________
    ____    |__( )_________  __/__  /________      __
   ____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
   ___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
    _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
   [2021-01-07 02:01:14,768] {scheduler_job.py:1241} INFO - Starting the scheduler
   [2021-01-07 02:01:14,769] {scheduler_job.py:1246} INFO - Processing each file at most -1 times
   /usr/local/lib/python3.7/site-packages/distributed/client.py:1128 VersionMismatchWarning: Mismatched versions found
   
   +-------------+-----------+-----------------------+-----------------------+
   | Package     | client    | scheduler             | workers               |
   +-------------+-----------+-----------------------+-----------------------+
   | distributed | 2020.12.0 | 2020.12.0+11.g4386b75 | 2020.12.0+11.g4386b75 |
   +-------------+-----------+-----------------------+-----------------------+
   [2021-01-07 02:01:14,838] {dag_processing.py:250} INFO - Launched DagFileProcessorManager with pid: 18779
   [2021-01-07 02:01:14,841] {scheduler_job.py:1751} INFO - Resetting orphaned tasks for active dag runs
   [2021-01-07 02:01:14,849] {settings.py:52} INFO - Configured default timezone Timezone('UTC')
   [2021-01-07 02:02:04,469] {scheduler_job.py:938} INFO - 4 tasks up for execution:
   	<TaskInstance: example_bash_operator.runme_0 2021-01-07 07:02:03.530249+00:00 [scheduled]>
   	<TaskInstance: example_bash_operator.runme_1 2021-01-07 07:02:03.530249+00:00 [scheduled]>
   	<TaskInstance: example_bash_operator.runme_2 2021-01-07 07:02:03.530249+00:00 [scheduled]>
   	<TaskInstance: example_bash_operator.also_run_this 2021-01-07 07:02:03.530249+00:00 [scheduled]>
   [2021-01-07 02:02:04,470] {scheduler_job.py:972} INFO - Figuring out tasks to run in Pool(name=default_pool) with 128 open slots and 4 task instances ready to be queued
   [2021-01-07 02:02:04,470] {scheduler_job.py:999} INFO - DAG example_bash_operator has 0/16 running and queued tasks
   [2021-01-07 02:02:04,471] {scheduler_job.py:999} INFO - DAG example_bash_operator has 1/16 running and queued tasks
   [2021-01-07 02:02:04,471] {scheduler_job.py:999} INFO - DAG example_bash_operator has 2/16 running and queued tasks
   [2021-01-07 02:02:04,471] {scheduler_job.py:999} INFO - DAG example_bash_operator has 3/16 running and queued tasks
   [2021-01-07 02:02:04,471] {scheduler_job.py:1060} INFO - Setting the following tasks to queued state:
   	<TaskInstance: example_bash_operator.runme_0 2021-01-07 07:02:03.530249+00:00 [scheduled]>
   	<TaskInstance: example_bash_operator.runme_1 2021-01-07 07:02:03.530249+00:00 [scheduled]>
   	<TaskInstance: example_bash_operator.runme_2 2021-01-07 07:02:03.530249+00:00 [scheduled]>
   	<TaskInstance: example_bash_operator.also_run_this 2021-01-07 07:02:03.530249+00:00 [scheduled]>
   [2021-01-07 02:02:04,473] {scheduler_job.py:1102} INFO - Sending TaskInstanceKey(dag_id='example_bash_operator', task_id='runme_0', execution_date=datetime.datetime(2021, 1, 7, 7, 2, 3, 530249, tzinfo=Timezone('UTC')), try_number=1) to executor with priority 3 and queue default
   [2021-01-07 02:02:04,473] {base_executor.py:79} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'example_bash_operator', 'runme_0', '2021-01-07T07:02:03.530249+00:00', '--local', '--pool', 'default_pool', '--subdir', '/usr/local/lib/python3.7/site-packages/airflow/example_dags/example_bash_operator.py']
   [2021-01-07 02:02:04,473] {scheduler_job.py:1102} INFO - Sending TaskInstanceKey(dag_id='example_bash_operator', task_id='runme_1', execution_date=datetime.datetime(2021, 1, 7, 7, 2, 3, 530249, tzinfo=Timezone('UTC')), try_number=1) to executor with priority 3 and queue default
   [2021-01-07 02:02:04,474] {base_executor.py:79} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'example_bash_operator', 'runme_1', '2021-01-07T07:02:03.530249+00:00', '--local', '--pool', 'default_pool', '--subdir', '/usr/local/lib/python3.7/site-packages/airflow/example_dags/example_bash_operator.py']
   [2021-01-07 02:02:04,474] {scheduler_job.py:1102} INFO - Sending TaskInstanceKey(dag_id='example_bash_operator', task_id='runme_2', execution_date=datetime.datetime(2021, 1, 7, 7, 2, 3, 530249, tzinfo=Timezone('UTC')), try_number=1) to executor with priority 3 and queue default
   [2021-01-07 02:02:04,474] {base_executor.py:79} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'example_bash_operator', 'runme_2', '2021-01-07T07:02:03.530249+00:00', '--local', '--pool', 'default_pool', '--subdir', '/usr/local/lib/python3.7/site-packages/airflow/example_dags/example_bash_operator.py']
   [2021-01-07 02:02:04,474] {scheduler_job.py:1102} INFO - Sending TaskInstanceKey(dag_id='example_bash_operator', task_id='also_run_this', execution_date=datetime.datetime(2021, 1, 7, 7, 2, 3, 530249, tzinfo=Timezone('UTC')), try_number=1) to executor with priority 2 and queue default
   [2021-01-07 02:02:04,474] {base_executor.py:79} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'example_bash_operator', 'also_run_this', '2021-01-07T07:02:03.530249+00:00', '--local', '--pool', 'default_pool', '--subdir', '/usr/local/lib/python3.7/site-packages/airflow/example_dags/example_bash_operator.py']
   [2021-01-07 02:02:06,343] {dask_executor.py:94} ERROR - Failed to execute task: CalledProcessError(120, ['airflow', 'tasks', 'run', 'example_bash_operator', 'runme_0', '2021-01-07T07:02:03.530249+00:00', '--local', '--pool', 'default_pool', '--subdir', '/usr/local/lib/python3.7/site-packages/airflow/example_dags/example_bash_operator.py'])
   [2021-01-07 02:02:06,344] {dask_executor.py:94} ERROR - Failed to execute task: CalledProcessError(120, ['airflow', 'tasks', 'run', 'example_bash_operator', 'runme_2', '2021-01-07T07:02:03.530249+00:00', '--local', '--pool', 'default_pool', '--subdir', '/usr/local/lib/python3.7/site-packages/airflow/example_dags/example_bash_operator.py'])
   [2021-01-07 02:02:06,345] {dask_executor.py:94} ERROR - Failed to execute task: CalledProcessError(120, ['airflow', 'tasks', 'run', 'example_bash_operator', 'also_run_this', '2021-01-07T07:02:03.530249+00:00', '--local', '--pool', 'default_pool', '--subdir', '/usr/local/lib/python3.7/site-packages/airflow/example_dags/example_bash_operator.py'])
   [2021-01-07 02:02:06,345] {scheduler_job.py:1200} INFO - Executor reports execution of example_bash_operator.runme_0 execution_date=2021-01-07 07:02:03.530249+00:00 exited with status failed for try_number 1
   [2021-01-07 02:02:06,345] {scheduler_job.py:1200} INFO - Executor reports execution of example_bash_operator.runme_2 execution_date=2021-01-07 07:02:03.530249+00:00 exited with status failed for try_number 1
   [2021-01-07 02:02:06,345] {scheduler_job.py:1200} INFO - Executor reports execution of example_bash_operator.also_run_this execution_date=2021-01-07 07:02:03.530249+00:00 exited with status failed for try_number 1
   [2021-01-07 02:02:06,350] {scheduler_job.py:1229} ERROR - Executor reports task instance <TaskInstance: example_bash_operator.runme_0 2021-01-07 07:02:03.530249+00:00 [queued]> finished (failed) although the task says its queued. (Info: None) Was the task killed externally?
   [2021-01-07 02:02:06,351] {scheduler_job.py:1229} ERROR - Executor reports task instance <TaskInstance: example_bash_operator.runme_2 2021-01-07 07:02:03.530249+00:00 [queued]> finished (failed) although the task says its queued. (Info: None) Was the task killed externally?
   [2021-01-07 02:02:06,352] {scheduler_job.py:1229} ERROR - Executor reports task instance <TaskInstance: example_bash_operator.also_run_this 2021-01-07 07:02:03.530249+00:00 [queued]> finished (failed) although the task says its queued. (Info: None) Was the task killed externally?
   [2021-01-07 02:02:07,410] {dask_executor.py:94} ERROR - Failed to execute task: CalledProcessError(120, ['airflow', 'tasks', 'run', 'example_bash_operator', 'runme_1', '2021-01-07T07:02:03.530249+00:00', '--local', '--pool', 'default_pool', '--subdir', '/usr/local/lib/python3.7/site-packages/airflow/example_dags/example_bash_operator.py'])
   [2021-01-07 02:02:07,410] {scheduler_job.py:1200} INFO - Executor reports execution of example_bash_operator.runme_1 execution_date=2021-01-07 07:02:03.530249+00:00 exited with status failed for try_number 1
   [2021-01-07 02:02:07,415] {scheduler_job.py:1229} ERROR - Executor reports task instance <TaskInstance: example_bash_operator.runme_1 2021-01-07 07:02:03.530249+00:00 [queued]> finished (failed) although the task says its queued. (Info: None) Was the task killed externally?
   Process ForkProcess-1:
   Traceback (most recent call last):
     File "/usr/local/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
       self.run()
     File "/usr/local/lib/python3.7/multiprocessing/process.py", line 99, in run
       self._target(*self._args, **self._kwargs)
     File "/usr/local/lib/python3.7/site-packages/airflow/utils/dag_processing.py", line 365, in _run_processor_manager
       processor_manager.start()
     File "/usr/local/lib/python3.7/site-packages/airflow/utils/dag_processing.py", line 596, in start
       return self._run_parsing_loop()
     File "/usr/local/lib/python3.7/site-packages/airflow/utils/dag_processing.py", line 659, in _run_parsing_loop
       self._processors.pop(processor.file_path)
   KeyError: '/usr/local/lib/python3.7/site-packages/airflow/example_dags/example_bash_operator.py'
   [2021-01-07 02:02:08,445] {dagrun.py:429} ERROR - Marking run <DagRun example_bash_operator @ 2021-01-07 07:02:03.530249+00:00: manual__2021-01-07T07:02:03.530249+00:00, externally triggered: True> failed
   [2021-01-07 02:02:08,470] {dag_processing.py:399} WARNING - DagFileProcessorManager (PID=18779) exited with exit code 1 - re-launching
   [2021-01-07 02:02:08,475] {dag_processing.py:250} INFO - Launched DagFileProcessorManager with pid: 21398
   [2021-01-07 02:02:08,483] {settings.py:52} INFO - Configured default timezone Timezone('UTC')
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org