You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2020/04/17 00:59:00 UTC

[jira] [Commented] (AIRFLOW-6834) Fix flaky test test_scheduler_job.py::TestDagFileProcessor::test_dag_file_processor_process_task_instances_depends_on_past

    [ https://issues.apache.org/jira/browse/AIRFLOW-6834?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085352#comment-17085352 ] 

ASF GitHub Bot commented on AIRFLOW-6834:
-----------------------------------------

stale[bot] commented on pull request #7470: [AIRFLOW-6834] Fix some flaky tests in test_scheduler_job.py
URL: https://github.com/apache/airflow/pull/7470
 
 
   
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Fix flaky test test_scheduler_job.py::TestDagFileProcessor::test_dag_file_processor_process_task_instances_depends_on_past
> --------------------------------------------------------------------------------------------------------------------------
>
>                 Key: AIRFLOW-6834
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-6834
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: tests
>    Affects Versions: 1.10.9
>            Reporter: Qian Yu
>            Assignee: Qian Yu
>            Priority: Major
>
> test_scheduler_job.py has a few flaky tests. Some are marked with pytest.mark.xfail, but this one is not marked flaky. It sometimes fails in Travis.
>  
> Example 1:
>  
> {code:python}
> ============================================================ FAILURES ============================================================
> _____________________ TestDagFileProcessor.test_dag_file_processor_process_task_instances_depends_on_past_0 ______________________
> a = (<tests.jobs.test_scheduler_job.TestDagFileProcessor testMethod=test_dag_file_processor_process_task_instances_depends_on_past_0>,)
>     @wraps(func)
>     def standalone_func(*a):
> >       return func(*(a + p.args), **p.kwargs)
> /usr/local/lib/python3.6/site-packages/parameterized/parameterized.py:518:
> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
> self = <tests.jobs.test_scheduler_job.TestDagFileProcessor testMethod=test_dag_file_processor_process_task_instances_depends_on_past_0>
> state = None, start_date = None, end_date = None
>     @parameterized.expand([
>         [State.NONE, None, None],
>         [State.UP_FOR_RETRY, timezone.utcnow() - datetime.timedelta(minutes=30),
>          timezone.utcnow() - datetime.timedelta(minutes=15)],
>         [State.UP_FOR_RESCHEDULE, timezone.utcnow() - datetime.timedelta(minutes=30),
>          timezone.utcnow() - datetime.timedelta(minutes=15)],
>     ])
>     def test_dag_file_processor_process_task_instances_depends_on_past(self, state, start_date, end_date):
>         """
>         Test if _process_task_instances puts the right task instances into the
>         mock_list.
>         """
>         dag = DAG(
>             dag_id='test_scheduler_process_execute_task_depends_on_past',
>             start_date=DEFAULT_DATE,
>             default_args={
>                 'depends_on_past': True,
>             },
>         )
>         dag_task1 = DummyOperator(
>             task_id='dummy1',
>             dag=dag,
>             owner='airflow')
>         dag_task2 = DummyOperator(
>             task_id='dummy2',
>             dag=dag,
>             owner='airflow')
>         with create_session() as session:
>             orm_dag = DagModel(dag_id=dag.dag_id)
>             session.merge(orm_dag)
>         dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
>         dag.clear()
>         dr = dag_file_processor.create_dag_run(dag)
>         self.assertIsNotNone(dr)
>         with create_session() as session:
>             tis = dr.get_task_instances(session=session)
>             for ti in tis:
>                 ti.state = state
>                 ti.start_date = start_date
>                 ti.end_date = end_date
>         ti_to_schedule = []
>         dag_file_processor._process_task_instances(dag, task_instances_list=ti_to_schedule)
> >       assert ti_to_schedule == [
>             (dag.dag_id, dag_task1.task_id, DEFAULT_DATE, TRY_NUMBER),
>             (dag.dag_id, dag_task2.task_id, DEFAULT_DATE, TRY_NUMBER),
>         ]
> E       AssertionError: assert [('test_scheduler_process_execute_task_depends_on_past',\n  'dummy2',\n  datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>),\n  1),\n ('test_scheduler_process_execute_task_depends_on_past',\n  'dummy1',\n  datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>),\n  1)] == [('test_scheduler_process_execute_task_depends_on_past',\n  'dummy1',\n  datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),\n  1),\n ('test_scheduler_process_execute_task_depends_on_past',\n  'dummy2',\n  datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),\n  1)]
> E         At index 0 diff: ('test_scheduler_process_execute_task_depends_on_past', 'dummy2', datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) != ('test_scheduler_process_execute_task_depends_on_past', 'dummy1', datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>), 1)
> E         Full diff:
> E           [
> E            ('test_scheduler_process_execute_task_depends_on_past',
> E         -   'dummy2',
> E         ?         ^
> E         +   'dummy1',
> E         ?         ^
> E         -   datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>),
> E         ?                                                       ----     ---------------------
> E         +   datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),
> E             1),
> E            ('test_scheduler_process_execute_task_depends_on_past',
> E         -   'dummy1',
> E         ?         ^
> E         +   'dummy2',
> E         ?         ^
> E         -   datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>),
> E         ?                                                       ----     ---------------------
> E         +   datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),
> E             1),
> E           ]
> tests/jobs/test_scheduler_job.py:565: AssertionError
> _______________ TestDagFileProcessor.test_dag_file_processor_process_task_instances_depends_on_past_1_up_for_retry _______________
> a = (<tests.jobs.test_scheduler_job.TestDagFileProcessor testMethod=test_dag_file_processor_process_task_instances_depends_on_past_1_up_for_retry>,)
>     @wraps(func)
>     def standalone_func(*a):
> >       return func(*(a + p.args), **p.kwargs)
> /usr/local/lib/python3.6/site-packages/parameterized/parameterized.py:518:
> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
> self = <tests.jobs.test_scheduler_job.TestDagFileProcessor testMethod=test_dag_file_processor_process_task_instances_depends_on_past_1_up_for_retry>
> state = 'up_for_retry', start_date = datetime.datetime(2020, 2, 18, 13, 26, 42, 916304, tzinfo=<Timezone [UTC]>)
> end_date = datetime.datetime(2020, 2, 18, 13, 41, 42, 916315, tzinfo=<Timezone [UTC]>)
>     @parameterized.expand([
>         [State.NONE, None, None],
>         [State.UP_FOR_RETRY, timezone.utcnow() - datetime.timedelta(minutes=30),
>          timezone.utcnow() - datetime.timedelta(minutes=15)],
>         [State.UP_FOR_RESCHEDULE, timezone.utcnow() - datetime.timedelta(minutes=30),
>          timezone.utcnow() - datetime.timedelta(minutes=15)],
>     ])
>     def test_dag_file_processor_process_task_instances_depends_on_past(self, state, start_date, end_date):
>         """
>         Test if _process_task_instances puts the right task instances into the
>         mock_list.
>         """
>         dag = DAG(
>             dag_id='test_scheduler_process_execute_task_depends_on_past',
>             start_date=DEFAULT_DATE,
>             default_args={
>                 'depends_on_past': True,
>             },
>         )
>         dag_task1 = DummyOperator(
>             task_id='dummy1',
>             dag=dag,
>             owner='airflow')
>         dag_task2 = DummyOperator(
>             task_id='dummy2',
>             dag=dag,
>             owner='airflow')
>         with create_session() as session:
>             orm_dag = DagModel(dag_id=dag.dag_id)
>             session.merge(orm_dag)
>         dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
>         dag.clear()
>         dr = dag_file_processor.create_dag_run(dag)
>         self.assertIsNotNone(dr)
>         with create_session() as session:
>             tis = dr.get_task_instances(session=session)
>             for ti in tis:
>                 ti.state = state
>                 ti.start_date = start_date
>                 ti.end_date = end_date
>         ti_to_schedule = []
>         dag_file_processor._process_task_instances(dag, task_instances_list=ti_to_schedule)
> >       assert ti_to_schedule == [
>             (dag.dag_id, dag_task1.task_id, DEFAULT_DATE, TRY_NUMBER),
>             (dag.dag_id, dag_task2.task_id, DEFAULT_DATE, TRY_NUMBER),
>         ]
> E       AssertionError: assert [('test_scheduler_process_execute_task_depends_on_past',\n  'dummy2',\n  datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>),\n  1),\n ('test_scheduler_process_execute_task_depends_on_past',\n  'dummy1',\n  datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>),\n  1)] == [('test_scheduler_process_execute_task_depends_on_past',\n  'dummy1',\n  datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),\n  1),\n ('test_scheduler_process_execute_task_depends_on_past',\n  'dummy2',\n  datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),\n  1)]
> E         At index 0 diff: ('test_scheduler_process_execute_task_depends_on_past', 'dummy2', datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) != ('test_scheduler_process_execute_task_depends_on_past', 'dummy1', datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>), 1)
> E         Full diff:
> E           [
> E            ('test_scheduler_process_execute_task_depends_on_past',
> E         -   'dummy2',
> E         ?         ^
> E         +   'dummy1',
> E         ?         ^
> E         -   datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>),
> E         ?                                                       ----     ---------------------
> E         +   datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),
> E             1),
> E            ('test_scheduler_process_execute_task_depends_on_past',
> E         -   'dummy1',
> E         ?         ^
> E         +   'dummy2',
> E         ?         ^
> E         -   datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>),
> E         ?                                                       ----     ---------------------
> E         +   datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),
> E             1),
> E           ]
> tests/jobs/test_scheduler_job.py:565: AssertionError
> ____________ TestDagFileProcessor.test_dag_file_processor_process_task_instances_depends_on_past_2_up_for_reschedule _____________
> a = (<tests.jobs.test_scheduler_job.TestDagFileProcessor testMethod=test_dag_file_processor_process_task_instances_depends_on_past_2_up_for_reschedule>,)
>     @wraps(func)
>     def standalone_func(*a):
> >       return func(*(a + p.args), **p.kwargs)
> /usr/local/lib/python3.6/site-packages/parameterized/parameterized.py:518:
> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
> self = <tests.jobs.test_scheduler_job.TestDagFileProcessor testMethod=test_dag_file_processor_process_task_instances_depends_on_past_2_up_for_reschedule>
> state = 'up_for_reschedule', start_date = datetime.datetime(2020, 2, 18, 13, 26, 42, 916318, tzinfo=<Timezone [UTC]>)
> end_date = datetime.datetime(2020, 2, 18, 13, 41, 42, 916321, tzinfo=<Timezone [UTC]>)
>     @parameterized.expand([
>         [State.NONE, None, None],
>         [State.UP_FOR_RETRY, timezone.utcnow() - datetime.timedelta(minutes=30),
>          timezone.utcnow() - datetime.timedelta(minutes=15)],
>         [State.UP_FOR_RESCHEDULE, timezone.utcnow() - datetime.timedelta(minutes=30),
>          timezone.utcnow() - datetime.timedelta(minutes=15)],
>     ])
>     def test_dag_file_processor_process_task_instances_depends_on_past(self, state, start_date, end_date):
>         """
>         Test if _process_task_instances puts the right task instances into the
>         mock_list.
>         """
>         dag = DAG(
>             dag_id='test_scheduler_process_execute_task_depends_on_past',
>             start_date=DEFAULT_DATE,
>             default_args={
>                 'depends_on_past': True,
>             },
>         )
>         dag_task1 = DummyOperator(
>             task_id='dummy1',
>             dag=dag,
>             owner='airflow')
>         dag_task2 = DummyOperator(
>             task_id='dummy2',
>             dag=dag,
>             owner='airflow')
>         with create_session() as session:
>             orm_dag = DagModel(dag_id=dag.dag_id)
>             session.merge(orm_dag)
>         dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
>         dag.clear()
>         dr = dag_file_processor.create_dag_run(dag)
>         self.assertIsNotNone(dr)
>         with create_session() as session:
>             tis = dr.get_task_instances(session=session)
>             for ti in tis:
>                 ti.state = state
>                 ti.start_date = start_date
>                 ti.end_date = end_date
>         ti_to_schedule = []
>         dag_file_processor._process_task_instances(dag, task_instances_list=ti_to_schedule)
> >       assert ti_to_schedule == [
>             (dag.dag_id, dag_task1.task_id, DEFAULT_DATE, TRY_NUMBER),
>             (dag.dag_id, dag_task2.task_id, DEFAULT_DATE, TRY_NUMBER),
>         ]
> E       AssertionError: assert [('test_scheduler_process_execute_task_depends_on_past',\n  'dummy2',\n  datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>),\n  1),\n ('test_scheduler_process_execute_task_depends_on_past',\n  'dummy1',\n  datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>),\n  1)] == [('test_scheduler_process_execute_task_depends_on_past',\n  'dummy1',\n  datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),\n  1),\n ('test_scheduler_process_execute_task_depends_on_past',\n  'dummy2',\n  datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),\n  1)]
> E         At index 0 diff: ('test_scheduler_process_execute_task_depends_on_past', 'dummy2', datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) != ('test_scheduler_process_execute_task_depends_on_past', 'dummy1', datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>), 1)
> E         Full diff:
> E           [
> E            ('test_scheduler_process_execute_task_depends_on_past',
> E         -   'dummy2',
> E         ?         ^
> E         +   'dummy1',
> E         ?         ^
> E         -   datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>),
> E         ?                                                       ----     ---------------------
> E         +   datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),
> E             1),
> E            ('test_scheduler_process_execute_task_depends_on_past',
> E         -   'dummy1',
> E         ?         ^
> E         +   'dummy2',
> E         ?         ^
> E         -   datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>),
> E         ?                                                       ----     ---------------------
> E         +   datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),
> E             1),
> E           ]
> tests/jobs/test_scheduler_job.py:565: AssertionError
> ======================================================== warnings summary ========================================================
> /usr/local/lib/python3.6/site-packages/flask_babel/__init__.py:19
>   /usr/local/lib/python3.6/site-packages/flask_babel/__init__.py:19: DeprecationWarning: The import 'werkzeug.ImmutableDict' is deprecated and will be removed in Werkzeug 1.0. Use 'from werkzeug.datastructures import ImmutableDict' instead.
>     from werkzeug import ImmutableDict
> /usr/local/lib/python3.6/site-packages/flask_wtf/recaptcha/widgets.py:5
>   /usr/local/lib/python3.6/site-packages/flask_wtf/recaptcha/widgets.py:5: DeprecationWarning: The import 'werkzeug.url_encode' is deprecated and will be removed in Werkzeug 1.0. Use 'from werkzeug.urls import url_encode' instead.
>     from werkzeug import url_encode
> -- Docs: https://docs.pytest.org/en/latest/warnings.html
> ==================================================== short test summary info =====================================================
> XFAIL tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_retry_handling_job
>   This test is failing!
> XPASS tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_change_state_for_tis_without_dagrun The test is flaky with nondeterministic result
> FAILED tests/jobs/test_scheduler_job.py::TestDagFileProcessor::test_dag_file_processor_process_task_instances_depends_on_past_0
> FAILED tests/jobs/test_scheduler_job.py::TestDagFileProcessor::test_dag_file_processor_process_task_instances_depends_on_past_1_up_for_retry
> FAILED tests/jobs/test_scheduler_job.py::TestDagFileProcessor::test_dag_file_processor_process_task_instances_depends_on_past_2_up_for_reschedule
> =========================== 3 failed, 86 passed, 1 xfailed, 1 xpassed, 2 warnings in 61.44s (0:01:01) ============================
> {code}
> Example 2:
> Run py.test like this in breeze. {{test_start_and_terminate_run_as_user}} fails. This happens because {{test_start_and_terminate_run_as_user}} is secretly relying on a call to {{settings.configure_orm()}} in {{test_task_command.py}}. When {{test_task_command.py}} is not run, {{test_start_and_terminate_run_as_user}} fails.
> {code}
> py.test --with-db-init tests/task/task_runner/test_standard_task_runner.py
> XPASS tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_change_state_for_tis_without_dagrun The test is flaky with nondeterministic result
> XPASS tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_retry_handling_job This test is failing!
> ERROR tests/task/task_runner/test_standard_task_runner.py::TestStandardTaskRunner::test_start_and_terminate_run_as_user - sqlal...
> FAILED tests/jobs/test_scheduler_job.py::TestDagFileProcessor::test_dag_file_processor_process_task_instances_depends_on_past_0
> FAILED tests/jobs/test_scheduler_job.py::TestDagFileProcessor::test_dag_file_processor_process_task_instances_depends_on_past_1_up_for_retry
> FAILED tests/jobs/test_scheduler_job.py::TestDagFileProcessor::test_dag_file_processor_process_task_instances_depends_on_past_2_up_for_reschedule
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)