You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Simon (Jira)" <ji...@apache.org> on 2020/02/21 20:07:00 UTC

[jira] [Commented] (AIRFLOW-1972) Clear raises an exception

    [ https://issues.apache.org/jira/browse/AIRFLOW-1972?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17042145#comment-17042145 ] 

Simon commented on AIRFLOW-1972:
--------------------------------

For anyone else finding there way here - just encountered this problem even using 1.10.6.  

Used the hint from [~mjerdely] to find a task with retry_wait erroneously set to an int value not timesdelta(seconds=value).  Thanks!

> Clear raises an exception
> -------------------------
>
>                 Key: AIRFLOW-1972
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-1972
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: DAG
>    Affects Versions: 1.9.0
>            Reporter: Ievgen Aleinikov
>            Priority: Major
>             Fix For: 1.10.0
>
>
> Hi,
> We are facing an issue with Clear function (in UI and console) after update to v1.9.0.
> {code}
>                           ____/ (  (    )   )  \___
>                          /( (  (  )   _    ))  )   )\
>                        ((     (   )(    )  )   (   )  )
>                      ((/  ( _(   )   (   _) ) (  () )  )
>                     ( (  ( (_)   ((    (   )  .((_ ) .  )_
>                    ( (  )    (      (  )    )   ) . ) (   )
>                   (  (   (  (   ) (  _  ( _) ).  ) . ) ) ( )
>                   ( (  (   ) (  )   (  ))     ) _)(   )  )  )
>                  ( (  ( \ ) (    (_  ( ) ( )  )   ) )  )) ( )
>                   (  (   (  (   (_ ( ) ( _    )  ) (  )  )   )
>                  ( (  ( (  (  )     (_  )  ) )  _)   ) _( ( )
>                   ((  (   )(    (     _    )   _) _(_ (  (_ )
>                    (_((__(_(__(( ( ( |  ) ) ) )_))__))_)___)
>                    ((__)        \\||lll|l||///          \_))
>                             (   /(/ (  )  ) )\   )
>                           (    ( ( ( | | ) ) )\   )
>                            (   /(| / ( )) ) ) )) )
>                          (     ( ((((_(|)_)))))     )
>                           (      ||\(|(|)|/||     )
>                         (        |(||(||)||||        )
>                           (     //|/l|||)|\\ \     )
>                         (/ / //  /|//||||\\  \ \  \ _)
> -------------------------------------------------------------------------------
> Node: ip-10-65-225-31.growth.internal.atlassian.com
> -------------------------------------------------------------------------------
> Traceback (most recent call last):
>   File "/var/lib/airflow/venv/local/lib/python2.7/site-packages/flask/app.py", line 1988, in wsgi_app
>     response = self.full_dispatch_request()
>   File "/var/lib/airflow/venv/local/lib/python2.7/site-packages/flask/app.py", line 1641, in full_dispatch_request
>     rv = self.handle_user_exception(e)
>   File "/var/lib/airflow/venv/local/lib/python2.7/site-packages/flask/app.py", line 1544, in handle_user_exception
>     reraise(exc_type, exc_value, tb)
>   File "/var/lib/airflow/venv/local/lib/python2.7/site-packages/flask/app.py", line 1639, in full_dispatch_request
>     rv = self.dispatch_request()
>   File "/var/lib/airflow/venv/local/lib/python2.7/site-packages/flask/app.py", line 1625, in dispatch_request
>     return self.view_functions[rule.endpoint](**req.view_args)
>   File "/var/lib/airflow/venv/local/lib/python2.7/site-packages/flask_admin/base.py", line 69, in inner
>     return self._run_view(f, *args, **kwargs)
>   File "/var/lib/airflow/venv/local/lib/python2.7/site-packages/flask_admin/base.py", line 368, in _run_view
>     return fn(self, *args, **kwargs)
>   File "/var/lib/airflow/venv/local/lib/python2.7/site-packages/flask_login.py", line 755, in decorated_view
>     return func(*args, **kwargs)
>   File "/var/lib/airflow/venv/local/lib/python2.7/site-packages/airflow/www/utils.py", line 262, in wrapper
>     return f(*args, **kwargs)
>   File "/var/lib/airflow/venv/local/lib/python2.7/site-packages/airflow/www/utils.py", line 309, in wrapper
>     return f(*args, **kwargs)
>   File "/var/lib/airflow/venv/local/lib/python2.7/site-packages/airflow/www/views.py", line 989, in clear
>     include_upstream=upstream)
>   File "/var/lib/airflow/venv/local/lib/python2.7/site-packages/airflow/models.py", line 3527, in sub_dag
>     dag = copy.deepcopy(self)
>   File "/usr/lib/python2.7/copy.py", line 174, in deepcopy
>     y = copier(memo)
>   File "/var/lib/airflow/venv/local/lib/python2.7/site-packages/airflow/models.py", line 3512, in __deepcopy__
>     setattr(result, k, copy.deepcopy(v, memo))
>   File "/usr/lib/python2.7/copy.py", line 163, in deepcopy
>     y = copier(x, memo)
>   File "/usr/lib/python2.7/copy.py", line 257, in _deepcopy_dict
>     y[deepcopy(key, memo)] = deepcopy(value, memo)
>   File "/usr/lib/python2.7/copy.py", line 174, in deepcopy
>     y = copier(memo)
>   File "/var/lib/airflow/venv/local/lib/python2.7/site-packages/airflow/models.py", line 2437, in __deepcopy__
>     setattr(result, k, copy.deepcopy(v, memo))
>   File "/usr/lib/python2.7/copy.py", line 190, in deepcopy
>     y = _reconstruct(x, rv, 1, memo)
>   File "/usr/lib/python2.7/copy.py", line 334, in _reconstruct
>     state = deepcopy(state, memo)
>   File "/usr/lib/python2.7/copy.py", line 163, in deepcopy
>     y = copier(x, memo)
>   File "/usr/lib/python2.7/copy.py", line 257, in _deepcopy_dict
>     y[deepcopy(key, memo)] = deepcopy(value, memo)
>   File "/usr/lib/python2.7/copy.py", line 190, in deepcopy
>     y = _reconstruct(x, rv, 1, memo)
>   File "/usr/lib/python2.7/copy.py", line 334, in _reconstruct
>     state = deepcopy(state, memo)
>   File "/usr/lib/python2.7/copy.py", line 163, in deepcopy
>     y = copier(x, memo)
>   File "/usr/lib/python2.7/copy.py", line 257, in _deepcopy_dict
>     y[deepcopy(key, memo)] = deepcopy(value, memo)
>   File "/usr/lib/python2.7/copy.py", line 163, in deepcopy
>     y = copier(x, memo)
>   File "/usr/lib/python2.7/copy.py", line 230, in _deepcopy_list
>     y.append(deepcopy(a, memo))
>   File "/usr/lib/python2.7/copy.py", line 190, in deepcopy
>     y = _reconstruct(x, rv, 1, memo)
>   File "/usr/lib/python2.7/copy.py", line 334, in _reconstruct
>     state = deepcopy(state, memo)
>   File "/usr/lib/python2.7/copy.py", line 163, in deepcopy
>     y = copier(x, memo)
>   File "/usr/lib/python2.7/copy.py", line 257, in _deepcopy_dict
>     y[deepcopy(key, memo)] = deepcopy(value, memo)
>   File "/usr/lib/python2.7/copy.py", line 190, in deepcopy
>     y = _reconstruct(x, rv, 1, memo)
>   File "/usr/lib/python2.7/copy.py", line 334, in _reconstruct
>     state = deepcopy(state, memo)
>   File "/usr/lib/python2.7/copy.py", line 163, in deepcopy
>     y = copier(x, memo)
>   File "/usr/lib/python2.7/copy.py", line 257, in _deepcopy_dict
>     y[deepcopy(key, memo)] = deepcopy(value, memo)
>   File "/usr/lib/python2.7/copy.py", line 182, in deepcopy
>     rv = reductor(2)
> TypeError: can't pickle thread.lock objects
> {code}
> While clearing execution for the following DAG (it's just a dummy implementation for the bug reproduction):
> {code}
> import airflow
> import airflow.operators
> from airflow.operators.subdag_operator import SubDagOperator
> import datetime
> import logging
> JOB_NAME = 'test_subdag_v3'
> default_args = {
>     'owner': 'airflow',
>     'depends_on_past': True,
>     'start_date': datetime.datetime(2018, 1, 7),
>     'schedule_interval': '0 3 * * *'
> }
> dag = airflow.DAG(JOB_NAME, default_args=default_args)
> def create_subdag(parent_dag_name, subdag_type, args, schedule_interval):
>     dag_subdag = airflow.DAG(
>         dag_id='%s.%s' % (parent_dag_name, subdag_type),
>         default_args=args,
>         start_date=args['start_date'],
>         schedule_interval=schedule_interval,
>     )
>     subdag_start = airflow.operators.DummyOperator(
>         task_id='subdag_start_%s' % (subdag_type),
>         dag=dag_subdag
>     )
>     subdag_end = airflow.operators.DummyOperator(
>         task_id='subdag_end_%s' % (subdag_type),
>         dag=dag_subdag
>     )
>     dummy = airflow.operators.DummyOperator(
>         task_id='dummy_%s' % (subdag_type),
>         dag=dag_subdag
>     )
>     subdag_start.set_downstream(dummy)
>     subdag_end.set_upstream(dummy)
>     return dag_subdag
> start = airflow.operators.DummyOperator(
>     task_id='start',
>     dag=dag
> )
> end = airflow.operators.DummyOperator(
>     task_id='end',
>     dag=dag
> )
> failure = airflow.operators.DummyOperator(
>     task_id='failure',
>     trigger_rule='one_failed',
>     dag=dag
> )
> current_view = SubDagOperator(
>     subdag=create_subdag(
>         parent_dag_name=JOB_NAME,
>         subdag_type='current_view',
>         args=dag.default_args,
>         schedule_interval='@once'),
>     task_id='current_view',
>     dag=dag,
> )
> dimensional_model = SubDagOperator(
>     subdag=create_subdag(
>         parent_dag_name=JOB_NAME,
>         subdag_type='dimensional_model',
>         args=dag.default_args,
>         schedule_interval='@once'),
>     task_id='dimensional_model',
>     dag=dag,
> )
> start.set_downstream(current_view)
> current_view.set_downstream(dimensional_model)
> dimensional_model.set_downstream(end)
> failure.set_upstream(current_view)
> failure.set_upstream(dimensional_model)
> {code}
> This DAG is working on v1.8.0
> Could you please advise if it's expected (problem in the DAG or it's an Airflow issue).
> Kind regards,
> Ievgen.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)