You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2020/02/20 03:47:00 UTC

[jira] [Commented] (AIRFLOW-6834) Fix flaky test test_scheduler_job.py::TestDagFileProcessor::test_dag_file_processor_process_task_instances_depends_on_past

    [ https://issues.apache.org/jira/browse/AIRFLOW-6834?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17040604#comment-17040604 ] 

ASF GitHub Bot commented on AIRFLOW-6834:
-----------------------------------------

yuqian90 commented on pull request #7470: [AIRFLOW-6834] Fix flaky test_scheduler_job by sorting TaskInstance
URL: https://github.com/apache/airflow/pull/7470
 
 
   Fix a flaky test that fails because `TaskInstance` are returned in non-deterministic order:
   ```
   test_scheduler_job.py::TestDagFileProcessor::test_dag_file_processor_process_task_instances_depends_on_past
   ```
   ---
   Issue link: WILL BE INSERTED BY [boring-cyborg](https://github.com/kaxil/boring-cyborg)
   
   Make sure to mark the boxes below before creating PR: [x]
   
   - [x] Description above provides context of the change
   - [x] Commit message/PR title starts with `[AIRFLOW-NNNN]`. AIRFLOW-NNNN = JIRA ID<sup>*</sup>
   - [x] Unit tests coverage for changes (not needed for documentation changes)
   - [x] Commits follow "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)"
   - [x] Relevant documentation is updated including usage instructions.
   - [x] I will engage committers as explained in [Contribution Workflow Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example).
   
   <sup>*</sup> For document-only changes commit message can start with `[AIRFLOW-XXXX]`.
   
   ---
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   Read the [Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines) for more information.
   
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Fix flaky test test_scheduler_job.py::TestDagFileProcessor::test_dag_file_processor_process_task_instances_depends_on_past
> --------------------------------------------------------------------------------------------------------------------------
>
>                 Key: AIRFLOW-6834
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-6834
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: tests
>    Affects Versions: 1.10.9
>            Reporter: Qian Yu
>            Priority: Major
>
> test_scheduler_job.py has a few flaky tests. Some are marked with pytest.mark.xfail, but this one is not marked flaky. It sometimes fails in Travis. For example:
>  
> {code:python}
> ============================================================ FAILURES ============================================================
> _____________________ TestDagFileProcessor.test_dag_file_processor_process_task_instances_depends_on_past_0 ______________________
> a = (<tests.jobs.test_scheduler_job.TestDagFileProcessor testMethod=test_dag_file_processor_process_task_instances_depends_on_past_0>,)
>     @wraps(func)
>     def standalone_func(*a):
> >       return func(*(a + p.args), **p.kwargs)
> /usr/local/lib/python3.6/site-packages/parameterized/parameterized.py:518:
> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
> self = <tests.jobs.test_scheduler_job.TestDagFileProcessor testMethod=test_dag_file_processor_process_task_instances_depends_on_past_0>
> state = None, start_date = None, end_date = None
>     @parameterized.expand([
>         [State.NONE, None, None],
>         [State.UP_FOR_RETRY, timezone.utcnow() - datetime.timedelta(minutes=30),
>          timezone.utcnow() - datetime.timedelta(minutes=15)],
>         [State.UP_FOR_RESCHEDULE, timezone.utcnow() - datetime.timedelta(minutes=30),
>          timezone.utcnow() - datetime.timedelta(minutes=15)],
>     ])
>     def test_dag_file_processor_process_task_instances_depends_on_past(self, state, start_date, end_date):
>         """
>         Test if _process_task_instances puts the right task instances into the
>         mock_list.
>         """
>         dag = DAG(
>             dag_id='test_scheduler_process_execute_task_depends_on_past',
>             start_date=DEFAULT_DATE,
>             default_args={
>                 'depends_on_past': True,
>             },
>         )
>         dag_task1 = DummyOperator(
>             task_id='dummy1',
>             dag=dag,
>             owner='airflow')
>         dag_task2 = DummyOperator(
>             task_id='dummy2',
>             dag=dag,
>             owner='airflow')
>         with create_session() as session:
>             orm_dag = DagModel(dag_id=dag.dag_id)
>             session.merge(orm_dag)
>         dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
>         dag.clear()
>         dr = dag_file_processor.create_dag_run(dag)
>         self.assertIsNotNone(dr)
>         with create_session() as session:
>             tis = dr.get_task_instances(session=session)
>             for ti in tis:
>                 ti.state = state
>                 ti.start_date = start_date
>                 ti.end_date = end_date
>         ti_to_schedule = []
>         dag_file_processor._process_task_instances(dag, task_instances_list=ti_to_schedule)
> >       assert ti_to_schedule == [
>             (dag.dag_id, dag_task1.task_id, DEFAULT_DATE, TRY_NUMBER),
>             (dag.dag_id, dag_task2.task_id, DEFAULT_DATE, TRY_NUMBER),
>         ]
> E       AssertionError: assert [('test_scheduler_process_execute_task_depends_on_past',\n  'dummy2',\n  datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>),\n  1),\n ('test_scheduler_process_execute_task_depends_on_past',\n  'dummy1',\n  datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>),\n  1)] == [('test_scheduler_process_execute_task_depends_on_past',\n  'dummy1',\n  datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),\n  1),\n ('test_scheduler_process_execute_task_depends_on_past',\n  'dummy2',\n  datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),\n  1)]
> E         At index 0 diff: ('test_scheduler_process_execute_task_depends_on_past', 'dummy2', datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) != ('test_scheduler_process_execute_task_depends_on_past', 'dummy1', datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>), 1)
> E         Full diff:
> E           [
> E            ('test_scheduler_process_execute_task_depends_on_past',
> E         -   'dummy2',
> E         ?         ^
> E         +   'dummy1',
> E         ?         ^
> E         -   datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>),
> E         ?                                                       ----     ---------------------
> E         +   datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),
> E             1),
> E            ('test_scheduler_process_execute_task_depends_on_past',
> E         -   'dummy1',
> E         ?         ^
> E         +   'dummy2',
> E         ?         ^
> E         -   datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>),
> E         ?                                                       ----     ---------------------
> E         +   datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),
> E             1),
> E           ]
> tests/jobs/test_scheduler_job.py:565: AssertionError
> _______________ TestDagFileProcessor.test_dag_file_processor_process_task_instances_depends_on_past_1_up_for_retry _______________
> a = (<tests.jobs.test_scheduler_job.TestDagFileProcessor testMethod=test_dag_file_processor_process_task_instances_depends_on_past_1_up_for_retry>,)
>     @wraps(func)
>     def standalone_func(*a):
> >       return func(*(a + p.args), **p.kwargs)
> /usr/local/lib/python3.6/site-packages/parameterized/parameterized.py:518:
> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
> self = <tests.jobs.test_scheduler_job.TestDagFileProcessor testMethod=test_dag_file_processor_process_task_instances_depends_on_past_1_up_for_retry>
> state = 'up_for_retry', start_date = datetime.datetime(2020, 2, 18, 13, 26, 42, 916304, tzinfo=<Timezone [UTC]>)
> end_date = datetime.datetime(2020, 2, 18, 13, 41, 42, 916315, tzinfo=<Timezone [UTC]>)
>     @parameterized.expand([
>         [State.NONE, None, None],
>         [State.UP_FOR_RETRY, timezone.utcnow() - datetime.timedelta(minutes=30),
>          timezone.utcnow() - datetime.timedelta(minutes=15)],
>         [State.UP_FOR_RESCHEDULE, timezone.utcnow() - datetime.timedelta(minutes=30),
>          timezone.utcnow() - datetime.timedelta(minutes=15)],
>     ])
>     def test_dag_file_processor_process_task_instances_depends_on_past(self, state, start_date, end_date):
>         """
>         Test if _process_task_instances puts the right task instances into the
>         mock_list.
>         """
>         dag = DAG(
>             dag_id='test_scheduler_process_execute_task_depends_on_past',
>             start_date=DEFAULT_DATE,
>             default_args={
>                 'depends_on_past': True,
>             },
>         )
>         dag_task1 = DummyOperator(
>             task_id='dummy1',
>             dag=dag,
>             owner='airflow')
>         dag_task2 = DummyOperator(
>             task_id='dummy2',
>             dag=dag,
>             owner='airflow')
>         with create_session() as session:
>             orm_dag = DagModel(dag_id=dag.dag_id)
>             session.merge(orm_dag)
>         dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
>         dag.clear()
>         dr = dag_file_processor.create_dag_run(dag)
>         self.assertIsNotNone(dr)
>         with create_session() as session:
>             tis = dr.get_task_instances(session=session)
>             for ti in tis:
>                 ti.state = state
>                 ti.start_date = start_date
>                 ti.end_date = end_date
>         ti_to_schedule = []
>         dag_file_processor._process_task_instances(dag, task_instances_list=ti_to_schedule)
> >       assert ti_to_schedule == [
>             (dag.dag_id, dag_task1.task_id, DEFAULT_DATE, TRY_NUMBER),
>             (dag.dag_id, dag_task2.task_id, DEFAULT_DATE, TRY_NUMBER),
>         ]
> E       AssertionError: assert [('test_scheduler_process_execute_task_depends_on_past',\n  'dummy2',\n  datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>),\n  1),\n ('test_scheduler_process_execute_task_depends_on_past',\n  'dummy1',\n  datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>),\n  1)] == [('test_scheduler_process_execute_task_depends_on_past',\n  'dummy1',\n  datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),\n  1),\n ('test_scheduler_process_execute_task_depends_on_past',\n  'dummy2',\n  datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),\n  1)]
> E         At index 0 diff: ('test_scheduler_process_execute_task_depends_on_past', 'dummy2', datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) != ('test_scheduler_process_execute_task_depends_on_past', 'dummy1', datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>), 1)
> E         Full diff:
> E           [
> E            ('test_scheduler_process_execute_task_depends_on_past',
> E         -   'dummy2',
> E         ?         ^
> E         +   'dummy1',
> E         ?         ^
> E         -   datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>),
> E         ?                                                       ----     ---------------------
> E         +   datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),
> E             1),
> E            ('test_scheduler_process_execute_task_depends_on_past',
> E         -   'dummy1',
> E         ?         ^
> E         +   'dummy2',
> E         ?         ^
> E         -   datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>),
> E         ?                                                       ----     ---------------------
> E         +   datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),
> E             1),
> E           ]
> tests/jobs/test_scheduler_job.py:565: AssertionError
> ____________ TestDagFileProcessor.test_dag_file_processor_process_task_instances_depends_on_past_2_up_for_reschedule _____________
> a = (<tests.jobs.test_scheduler_job.TestDagFileProcessor testMethod=test_dag_file_processor_process_task_instances_depends_on_past_2_up_for_reschedule>,)
>     @wraps(func)
>     def standalone_func(*a):
> >       return func(*(a + p.args), **p.kwargs)
> /usr/local/lib/python3.6/site-packages/parameterized/parameterized.py:518:
> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
> self = <tests.jobs.test_scheduler_job.TestDagFileProcessor testMethod=test_dag_file_processor_process_task_instances_depends_on_past_2_up_for_reschedule>
> state = 'up_for_reschedule', start_date = datetime.datetime(2020, 2, 18, 13, 26, 42, 916318, tzinfo=<Timezone [UTC]>)
> end_date = datetime.datetime(2020, 2, 18, 13, 41, 42, 916321, tzinfo=<Timezone [UTC]>)
>     @parameterized.expand([
>         [State.NONE, None, None],
>         [State.UP_FOR_RETRY, timezone.utcnow() - datetime.timedelta(minutes=30),
>          timezone.utcnow() - datetime.timedelta(minutes=15)],
>         [State.UP_FOR_RESCHEDULE, timezone.utcnow() - datetime.timedelta(minutes=30),
>          timezone.utcnow() - datetime.timedelta(minutes=15)],
>     ])
>     def test_dag_file_processor_process_task_instances_depends_on_past(self, state, start_date, end_date):
>         """
>         Test if _process_task_instances puts the right task instances into the
>         mock_list.
>         """
>         dag = DAG(
>             dag_id='test_scheduler_process_execute_task_depends_on_past',
>             start_date=DEFAULT_DATE,
>             default_args={
>                 'depends_on_past': True,
>             },
>         )
>         dag_task1 = DummyOperator(
>             task_id='dummy1',
>             dag=dag,
>             owner='airflow')
>         dag_task2 = DummyOperator(
>             task_id='dummy2',
>             dag=dag,
>             owner='airflow')
>         with create_session() as session:
>             orm_dag = DagModel(dag_id=dag.dag_id)
>             session.merge(orm_dag)
>         dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
>         dag.clear()
>         dr = dag_file_processor.create_dag_run(dag)
>         self.assertIsNotNone(dr)
>         with create_session() as session:
>             tis = dr.get_task_instances(session=session)
>             for ti in tis:
>                 ti.state = state
>                 ti.start_date = start_date
>                 ti.end_date = end_date
>         ti_to_schedule = []
>         dag_file_processor._process_task_instances(dag, task_instances_list=ti_to_schedule)
> >       assert ti_to_schedule == [
>             (dag.dag_id, dag_task1.task_id, DEFAULT_DATE, TRY_NUMBER),
>             (dag.dag_id, dag_task2.task_id, DEFAULT_DATE, TRY_NUMBER),
>         ]
> E       AssertionError: assert [('test_scheduler_process_execute_task_depends_on_past',\n  'dummy2',\n  datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>),\n  1),\n ('test_scheduler_process_execute_task_depends_on_past',\n  'dummy1',\n  datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>),\n  1)] == [('test_scheduler_process_execute_task_depends_on_past',\n  'dummy1',\n  datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),\n  1),\n ('test_scheduler_process_execute_task_depends_on_past',\n  'dummy2',\n  datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),\n  1)]
> E         At index 0 diff: ('test_scheduler_process_execute_task_depends_on_past', 'dummy2', datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) != ('test_scheduler_process_execute_task_depends_on_past', 'dummy1', datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>), 1)
> E         Full diff:
> E           [
> E            ('test_scheduler_process_execute_task_depends_on_past',
> E         -   'dummy2',
> E         ?         ^
> E         +   'dummy1',
> E         ?         ^
> E         -   datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>),
> E         ?                                                       ----     ---------------------
> E         +   datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),
> E             1),
> E            ('test_scheduler_process_execute_task_depends_on_past',
> E         -   'dummy1',
> E         ?         ^
> E         +   'dummy2',
> E         ?         ^
> E         -   datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>),
> E         ?                                                       ----     ---------------------
> E         +   datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),
> E             1),
> E           ]
> tests/jobs/test_scheduler_job.py:565: AssertionError
> ======================================================== warnings summary ========================================================
> /usr/local/lib/python3.6/site-packages/flask_babel/__init__.py:19
>   /usr/local/lib/python3.6/site-packages/flask_babel/__init__.py:19: DeprecationWarning: The import 'werkzeug.ImmutableDict' is deprecated and will be removed in Werkzeug 1.0. Use 'from werkzeug.datastructures import ImmutableDict' instead.
>     from werkzeug import ImmutableDict
> /usr/local/lib/python3.6/site-packages/flask_wtf/recaptcha/widgets.py:5
>   /usr/local/lib/python3.6/site-packages/flask_wtf/recaptcha/widgets.py:5: DeprecationWarning: The import 'werkzeug.url_encode' is deprecated and will be removed in Werkzeug 1.0. Use 'from werkzeug.urls import url_encode' instead.
>     from werkzeug import url_encode
> -- Docs: https://docs.pytest.org/en/latest/warnings.html
> ==================================================== short test summary info =====================================================
> XFAIL tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_retry_handling_job
>   This test is failing!
> XPASS tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_change_state_for_tis_without_dagrun The test is flaky with nondeterministic result
> FAILED tests/jobs/test_scheduler_job.py::TestDagFileProcessor::test_dag_file_processor_process_task_instances_depends_on_past_0
> FAILED tests/jobs/test_scheduler_job.py::TestDagFileProcessor::test_dag_file_processor_process_task_instances_depends_on_past_1_up_for_retry
> FAILED tests/jobs/test_scheduler_job.py::TestDagFileProcessor::test_dag_file_processor_process_task_instances_depends_on_past_2_up_for_reschedule
> =========================== 3 failed, 86 passed, 1 xfailed, 1 xpassed, 2 warnings in 61.44s (0:01:01) ============================
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)